Skip to main content
These patterns demonstrate production-ready implementations combining all ABV features.

Production Chatbot

Complete chatbot with guardrails, streaming, and user feedback.
from abvdev import ABV, observe
from typing import Generator, Optional
from dataclasses import dataclass

abv = ABV(api_key="sk-abv-...")

@dataclass
class ChatMessage:
    role: str
    content: str

class ProductionChatbot:
    def __init__(self, system_prompt: str):
        self.system_prompt = system_prompt
        self.conversations = {}

    @observe()
    def chat(
        self,
        user_id: str,
        session_id: str,
        message: str,
        stream: bool = False
    ) -> str | Generator[str, None, None]:
        # Set trace context
        abv.update_current_trace(
            user_id=user_id,
            session_id=session_id,
            tags=["chatbot", "production"],
            metadata={"stream": stream}
        )

        # Input validation
        with abv.start_as_current_observation(as_type="guardrail", name="input-guard") as g:
            check = abv.guardrails.validate_toxic_language(message, {"sensitivity": "medium"})
            g.update(output={"status": check.status})
            if check.status == "FAIL":
                abv.score_current_trace(name="blocked_input", value=1.0, data_type="BOOLEAN")
                return "I can't respond to that. Please rephrase."

        # Get conversation history
        history = self._get_history(session_id)
        history.append({"role": "user", "content": message})

        # Generate response
        with abv.start_as_current_generation(name="chat-response", model="gpt-4o-mini") as gen:
            if stream:
                return self._stream_response(gen, history, session_id)
            else:
                return self._sync_response(gen, history, session_id)

    def _sync_response(self, gen, history, session_id) -> str:
        response = abv.gateway.complete_chat(
            provider="openai",
            model="gpt-4o-mini",
            messages=history
        )
        content = response.choices[0].message.content

        gen.update(
            output=content,
            usage_details={
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens
            }
        )

        # Output validation
        check = abv.guardrails.validate_toxic_language(content, {"sensitivity": "high"})
        if check.status == "FAIL":
            content = "I apologize, but I need to rephrase my response."

        history.append({"role": "assistant", "content": content})
        return content

    def _stream_response(self, gen, history, session_id) -> Generator[str, None, None]:
        stream = abv.gateway.complete_chat_stream(
            provider="openai",
            model="gpt-4o-mini",
            messages=history
        )

        full_response = ""
        for chunk in stream:
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                full_response += content
                yield content

        gen.update(output=full_response)
        history.append({"role": "assistant", "content": full_response})

    def _get_history(self, session_id: str) -> list:
        if session_id not in self.conversations:
            self.conversations[session_id] = [
                {"role": "system", "content": self.system_prompt}
            ]
        return self.conversations[session_id]

    @observe()
    def submit_feedback(self, session_id: str, rating: int, comment: str = ""):
        """User submits feedback for the conversation."""
        abv.update_current_trace(session_id=session_id)
        abv.score_current_trace(
            name="user_rating",
            value=rating / 5.0,  # Normalize to 0-1
            data_type="NUMERIC",
            comment=comment
        )

# Usage
bot = ProductionChatbot("You are a helpful customer support assistant.")
response = bot.chat("user-123", "session-abc", "How do I reset my password?")
bot.submit_feedback("session-abc", rating=5, comment="Very helpful!")

RAG Pipeline

Retrieval-Augmented Generation with full tracing.
from abvdev import ABV, observe
from typing import List, Dict
import numpy as np

abv = ABV(api_key="sk-abv-...")

class RAGPipeline:
    def __init__(self, documents: List[Dict]):
        self.documents = documents
        self.embeddings = None

    @observe(as_type="embedding")
    def embed_documents(self):
        """Create embeddings for all documents."""
        texts = [doc["content"] for doc in self.documents]

        # Simulated embedding (use actual embedding API)
        self.embeddings = np.random.rand(len(texts), 1536)

        abv.update_current_span(
            output={"num_documents": len(texts), "embedding_dim": 1536}
        )

    @observe(as_type="retriever")
    def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        """Retrieve relevant documents."""
        abv.update_current_span(input={"query": query, "top_k": top_k})

        # Simulated retrieval (use actual vector search)
        indices = np.random.choice(len(self.documents), min(top_k, len(self.documents)), replace=False)
        results = [self.documents[i] for i in indices]

        abv.update_current_span(
            output={"num_results": len(results)},
            metadata={"retrieval_method": "cosine_similarity"}
        )
        return results

    @observe()
    def query(self, user_id: str, question: str) -> Dict:
        """Full RAG query pipeline."""
        abv.update_current_trace(
            user_id=user_id,
            tags=["rag", "production"],
            metadata={"pipeline_version": "v2"}
        )

        # Retrieve relevant context
        docs = self.retrieve(question, top_k=3)
        context = "\n\n".join([d["content"] for d in docs])

        # Generate answer
        with abv.start_as_current_generation(
            name="answer-generation",
            model="gpt-4o-mini",
            model_parameters={"temperature": 0.3}
        ) as gen:
            response = abv.gateway.complete_chat(
                provider="openai",
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": f"Answer based on this context:\n{context}"},
                    {"role": "user", "content": question}
                ],
                temperature=0.3
            )
            answer = response.choices[0].message.content

            gen.update(
                input={"question": question, "context_length": len(context)},
                output=answer,
                usage_details={
                    "prompt_tokens": response.usage.prompt_tokens,
                    "completion_tokens": response.usage.completion_tokens
                }
            )

        # Score retrieval quality (based on answer confidence)
        abv.score_current_trace(
            name="retrieval_relevance",
            value=0.85,  # Could be computed from model confidence
            data_type="NUMERIC"
        )

        return {
            "answer": answer,
            "sources": [d["title"] for d in docs],
            "context_used": len(context)
        }

# Usage
documents = [
    {"title": "Doc 1", "content": "Python is a programming language..."},
    {"title": "Doc 2", "content": "Machine learning involves..."},
    {"title": "Doc 3", "content": "Data structures are..."},
]

rag = RAGPipeline(documents)
rag.embed_documents()
result = rag.query("user-123", "What is Python?")

Agent with Tools

Multi-step agent with tool calls and full observability.
from abvdev import ABV, observe
from typing import Dict, Any, Callable
import json

abv = ABV(api_key="sk-abv-...")

class Agent:
    def __init__(self, tools: Dict[str, Callable]):
        self.tools = tools

    @observe(as_type="tool")
    def execute_tool(self, tool_name: str, args: Dict[str, Any]) -> Any:
        """Execute a tool and trace it."""
        abv.update_current_span(
            input={"tool": tool_name, "args": args},
            metadata={"tool_name": tool_name}
        )

        if tool_name not in self.tools:
            raise ValueError(f"Unknown tool: {tool_name}")

        result = self.tools[tool_name](**args)
        abv.update_current_span(output=result)
        return result

    @observe(as_type="agent")
    def run(self, user_id: str, task: str, max_steps: int = 5) -> str:
        """Run agent loop."""
        abv.update_current_trace(
            user_id=user_id,
            tags=["agent", "production"],
            metadata={"max_steps": max_steps}
        )

        messages = [
            {"role": "system", "content": self._get_system_prompt()},
            {"role": "user", "content": task}
        ]

        for step in range(max_steps):
            with abv.start_as_current_span(name=f"step-{step+1}") as step_span:
                step_span.update(metadata={"step": step + 1})

                # Get next action from LLM
                with abv.start_as_current_generation(name="plan", model="gpt-4o") as gen:
                    response = abv.gateway.complete_chat(
                        provider="openai",
                        model="gpt-4o",
                        messages=messages,
                        temperature=0
                    )
                    content = response.choices[0].message.content
                    gen.update(output=content)

                # Check if agent is done
                if "FINAL ANSWER:" in content:
                    final_answer = content.split("FINAL ANSWER:")[-1].strip()
                    abv.score_current_trace(name="steps_taken", value=step + 1, data_type="NUMERIC")
                    abv.score_current_trace(name="completed", value=1.0, data_type="BOOLEAN")
                    return final_answer

                # Parse and execute tool call
                try:
                    tool_call = self._parse_tool_call(content)
                    result = self.execute_tool(tool_call["tool"], tool_call["args"])
                    messages.append({"role": "assistant", "content": content})
                    messages.append({"role": "user", "content": f"Tool result: {result}"})
                except Exception as e:
                    messages.append({"role": "user", "content": f"Error: {e}"})

        abv.score_current_trace(name="completed", value=0.0, data_type="BOOLEAN")
        return "Max steps reached without completion."

    def _get_system_prompt(self) -> str:
        tool_descriptions = "\n".join([f"- {name}" for name in self.tools.keys()])
        return f"""You are an agent that can use tools.

Available tools:
{tool_descriptions}

To use a tool, respond with:
TOOL: tool_name
ARGS: {{"arg1": "value1"}}

When you have the final answer, respond with:
FINAL ANSWER: your answer"""

    def _parse_tool_call(self, content: str) -> Dict:
        lines = content.strip().split("\n")
        tool_name = None
        args = {}
        for line in lines:
            if line.startswith("TOOL:"):
                tool_name = line.replace("TOOL:", "").strip()
            elif line.startswith("ARGS:"):
                args = json.loads(line.replace("ARGS:", "").strip())
        return {"tool": tool_name, "args": args}

# Define tools
def search_web(query: str) -> str:
    return f"Search results for '{query}': [Result 1, Result 2]"

def calculate(expression: str) -> str:
    return str(eval(expression))

# Usage
agent = Agent(tools={"search_web": search_web, "calculate": calculate})
result = agent.run("user-123", "What is 2 + 2, then search for Python tutorials")

Error Recovery Pattern

Graceful degradation with retries and fallbacks.
from abvdev import ABV, observe
import time
from typing import Optional

abv = ABV(api_key="sk-abv-...")

class ResilientGenerator:
    def __init__(self, max_retries: int = 3, backoff_factor: float = 1.5):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor

    @observe()
    def generate(self, prompt: str, user_id: str) -> Optional[str]:
        abv.update_current_trace(
            user_id=user_id,
            tags=["resilient", "production"]
        )

        last_error = None
        for attempt in range(self.max_retries):
            with abv.start_as_current_span(name=f"attempt-{attempt+1}") as span:
                span.update(metadata={"attempt": attempt + 1})

                try:
                    response = abv.gateway.complete_chat(
                        provider="openai",
                        model="gpt-4o-mini",
                        messages=[{"role": "user", "content": prompt}]
                    )
                    result = response.choices[0].message.content

                    # Validate response
                    if len(result.strip()) < 10:
                        raise ValueError("Response too short")

                    abv.score_current_trace(name="retries_needed", value=attempt, data_type="NUMERIC")
                    abv.score_current_trace(name="success", value=1.0, data_type="BOOLEAN")
                    return result

                except Exception as e:
                    last_error = e
                    span.update(level="WARNING", status_message=str(e))

                    if attempt < self.max_retries - 1:
                        sleep_time = self.backoff_factor ** attempt
                        time.sleep(sleep_time)

        # All retries failed - try fallback
        abv.score_current_trace(name="success", value=0.0, data_type="BOOLEAN")
        abv.score_current_trace(name="used_fallback", value=1.0, data_type="BOOLEAN")

        return self._fallback_response(prompt, last_error)

    def _fallback_response(self, prompt: str, error: Exception) -> str:
        with abv.start_as_current_span(name="fallback") as span:
            span.update(
                level="WARNING",
                metadata={"original_error": str(error)}
            )

            # Try a simpler/cheaper model
            try:
                response = abv.gateway.complete_chat(
                    provider="openai",
                    model="gpt-3.5-turbo",
                    messages=[{"role": "user", "content": prompt}]
                )
                return response.choices[0].message.content
            except:
                return "I'm sorry, I couldn't process your request. Please try again later."

# Usage
generator = ResilientGenerator(max_retries=3)
result = generator.generate("Explain quantum computing", "user-123")