These patterns demonstrate production-ready implementations combining all ABV features.
Production Chatbot
Complete chatbot with guardrails, streaming, and user feedback.- Python
- JavaScript
Copy
from abvdev import ABV, observe
from typing import Generator, Optional
from dataclasses import dataclass
abv = ABV(api_key="sk-abv-...")
@dataclass
class ChatMessage:
role: str
content: str
class ProductionChatbot:
def __init__(self, system_prompt: str):
self.system_prompt = system_prompt
self.conversations = {}
@observe()
def chat(
self,
user_id: str,
session_id: str,
message: str,
stream: bool = False
) -> str | Generator[str, None, None]:
# Set trace context
abv.update_current_trace(
user_id=user_id,
session_id=session_id,
tags=["chatbot", "production"],
metadata={"stream": stream}
)
# Input validation
with abv.start_as_current_observation(as_type="guardrail", name="input-guard") as g:
check = abv.guardrails.validate_toxic_language(message, {"sensitivity": "medium"})
g.update(output={"status": check.status})
if check.status == "FAIL":
abv.score_current_trace(name="blocked_input", value=1.0, data_type="BOOLEAN")
return "I can't respond to that. Please rephrase."
# Get conversation history
history = self._get_history(session_id)
history.append({"role": "user", "content": message})
# Generate response
with abv.start_as_current_generation(name="chat-response", model="gpt-4o-mini") as gen:
if stream:
return self._stream_response(gen, history, session_id)
else:
return self._sync_response(gen, history, session_id)
def _sync_response(self, gen, history, session_id) -> str:
response = abv.gateway.complete_chat(
provider="openai",
model="gpt-4o-mini",
messages=history
)
content = response.choices[0].message.content
gen.update(
output=content,
usage_details={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens
}
)
# Output validation
check = abv.guardrails.validate_toxic_language(content, {"sensitivity": "high"})
if check.status == "FAIL":
content = "I apologize, but I need to rephrase my response."
history.append({"role": "assistant", "content": content})
return content
def _stream_response(self, gen, history, session_id) -> Generator[str, None, None]:
stream = abv.gateway.complete_chat_stream(
provider="openai",
model="gpt-4o-mini",
messages=history
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
gen.update(output=full_response)
history.append({"role": "assistant", "content": full_response})
def _get_history(self, session_id: str) -> list:
if session_id not in self.conversations:
self.conversations[session_id] = [
{"role": "system", "content": self.system_prompt}
]
return self.conversations[session_id]
@observe()
def submit_feedback(self, session_id: str, rating: int, comment: str = ""):
"""User submits feedback for the conversation."""
abv.update_current_trace(session_id=session_id)
abv.score_current_trace(
name="user_rating",
value=rating / 5.0, # Normalize to 0-1
data_type="NUMERIC",
comment=comment
)
# Usage
bot = ProductionChatbot("You are a helpful customer support assistant.")
response = bot.chat("user-123", "session-abc", "How do I reset my password?")
bot.submit_feedback("session-abc", rating=5, comment="Very helpful!")
Copy
import {
startActiveObservation,
startObservation,
updateActiveTrace,
} from "@abvdev/tracing";
class ProductionChatbot {
private systemPrompt: string;
private conversations = new Map<string, any[]>();
constructor(systemPrompt: string) {
this.systemPrompt = systemPrompt;
}
async chat(
userId: string,
sessionId: string,
message: string,
stream: boolean = false
): Promise<string | AsyncGenerator<string>> {
return startActiveObservation("chat", async () => {
updateActiveTrace({
userId,
sessionId,
tags: ["chatbot", "production"],
metadata: { stream },
});
// Input guard
const guard = startObservation(
"input-guard",
{ input: message },
{ asType: "guardrail" }
);
const check =
await abv.guardrails.validators.toxicLanguage.validate(message, {
sensitivity: "MEDIUM",
});
guard.update({ output: { status: check.status } }).end();
if (check.status === "FAIL") {
abv.score.activeTrace({ name: "blocked_input", value: 1.0 });
return "I can't respond to that. Please rephrase.";
}
const history = this.getHistory(sessionId);
history.push({ role: "user", content: message });
const gen = startObservation(
"chat-response",
{ model: "gpt-4o-mini", input: history },
{ asType: "generation" }
);
if (stream) {
return this.streamResponse(gen, history, sessionId);
} else {
return this.syncResponse(gen, history, sessionId);
}
});
}
private async syncResponse(
gen: any,
history: any[],
sessionId: string
): Promise<string> {
const response = await abv.gateway.chat.completions.create({
provider: "openai",
model: "gpt-4o-mini",
messages: history,
});
let content = response.choices[0].message.content;
gen
.update({
output: { content },
usageDetails: {
input: response.usage?.prompt_tokens,
output: response.usage?.completion_tokens,
},
})
.end();
// Output validation
const check = await abv.guardrails.validators.toxicLanguage.validate(
content,
{ sensitivity: "HIGH" }
);
if (check.status === "FAIL") {
content = "I apologize, but I need to rephrase my response.";
}
history.push({ role: "assistant", content });
return content;
}
private async *streamResponse(
gen: any,
history: any[],
sessionId: string
): AsyncGenerator<string> {
const stream = await abv.gateway.chat.completions.create({
provider: "openai",
model: "gpt-4o-mini",
messages: history,
stream: true,
});
let fullResponse = "";
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
fullResponse += content;
yield content;
}
}
gen.update({ output: { content: fullResponse } }).end();
history.push({ role: "assistant", content: fullResponse });
}
private getHistory(sessionId: string): any[] {
if (!this.conversations.has(sessionId)) {
this.conversations.set(sessionId, [
{ role: "system", content: this.systemPrompt },
]);
}
return this.conversations.get(sessionId)!;
}
async submitFeedback(
sessionId: string,
rating: number,
comment: string = ""
) {
await startActiveObservation("submit-feedback", async () => {
updateActiveTrace({ sessionId });
abv.score.activeTrace({
name: "user_rating",
value: rating / 5.0,
comment,
});
});
}
}
// Usage
const bot = new ProductionChatbot(
"You are a helpful customer support assistant."
);
const response = await bot.chat(
"user-123",
"session-abc",
"How do I reset my password?"
);
await bot.submitFeedback("session-abc", 5, "Very helpful!");
RAG Pipeline
Retrieval-Augmented Generation with full tracing.- Python
- JavaScript
Copy
from abvdev import ABV, observe
from typing import List, Dict
import numpy as np
abv = ABV(api_key="sk-abv-...")
class RAGPipeline:
def __init__(self, documents: List[Dict]):
self.documents = documents
self.embeddings = None
@observe(as_type="embedding")
def embed_documents(self):
"""Create embeddings for all documents."""
texts = [doc["content"] for doc in self.documents]
# Simulated embedding (use actual embedding API)
self.embeddings = np.random.rand(len(texts), 1536)
abv.update_current_span(
output={"num_documents": len(texts), "embedding_dim": 1536}
)
@observe(as_type="retriever")
def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
"""Retrieve relevant documents."""
abv.update_current_span(input={"query": query, "top_k": top_k})
# Simulated retrieval (use actual vector search)
indices = np.random.choice(len(self.documents), min(top_k, len(self.documents)), replace=False)
results = [self.documents[i] for i in indices]
abv.update_current_span(
output={"num_results": len(results)},
metadata={"retrieval_method": "cosine_similarity"}
)
return results
@observe()
def query(self, user_id: str, question: str) -> Dict:
"""Full RAG query pipeline."""
abv.update_current_trace(
user_id=user_id,
tags=["rag", "production"],
metadata={"pipeline_version": "v2"}
)
# Retrieve relevant context
docs = self.retrieve(question, top_k=3)
context = "\n\n".join([d["content"] for d in docs])
# Generate answer
with abv.start_as_current_generation(
name="answer-generation",
model="gpt-4o-mini",
model_parameters={"temperature": 0.3}
) as gen:
response = abv.gateway.complete_chat(
provider="openai",
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"Answer based on this context:\n{context}"},
{"role": "user", "content": question}
],
temperature=0.3
)
answer = response.choices[0].message.content
gen.update(
input={"question": question, "context_length": len(context)},
output=answer,
usage_details={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens
}
)
# Score retrieval quality (based on answer confidence)
abv.score_current_trace(
name="retrieval_relevance",
value=0.85, # Could be computed from model confidence
data_type="NUMERIC"
)
return {
"answer": answer,
"sources": [d["title"] for d in docs],
"context_used": len(context)
}
# Usage
documents = [
{"title": "Doc 1", "content": "Python is a programming language..."},
{"title": "Doc 2", "content": "Machine learning involves..."},
{"title": "Doc 3", "content": "Data structures are..."},
]
rag = RAGPipeline(documents)
rag.embed_documents()
result = rag.query("user-123", "What is Python?")
Copy
import {
observe,
startActiveObservation,
startObservation,
updateActiveTrace,
updateActiveObservation,
} from "@abvdev/tracing";
interface Document {
title: string;
content: string;
}
class RAGPipeline {
private documents: Document[];
private embeddings: number[][] | null = null;
constructor(documents: Document[]) {
this.documents = documents;
}
embedDocuments = observe(
async () => {
const texts = this.documents.map((d) => d.content);
// Simulated embedding
this.embeddings = texts.map(() =>
Array(1536)
.fill(0)
.map(() => Math.random())
);
updateActiveObservation({
output: { num_documents: texts.length, embedding_dim: 1536 },
asType: "embedding",
});
},
{ name: "embed-documents", asType: "embedding" }
);
retrieve = observe(
async (query: string, topK: number = 3): Promise<Document[]> => {
updateActiveObservation({
input: { query, topK },
asType: "retriever",
});
// Simulated retrieval
const indices = Array.from({ length: Math.min(topK, this.documents.length) }, () =>
Math.floor(Math.random() * this.documents.length)
);
const results = [...new Set(indices)].map((i) => this.documents[i]);
updateActiveObservation({
output: { num_results: results.length },
metadata: { retrieval_method: "cosine_similarity" },
});
return results;
},
{ name: "retrieve", asType: "retriever" }
);
async query(userId: string, question: string) {
return startActiveObservation("rag-query", async () => {
updateActiveTrace({
userId,
tags: ["rag", "production"],
metadata: { pipeline_version: "v2" },
});
// Retrieve
const docs = await this.retrieve(question, 3);
const context = docs.map((d) => d.content).join("\n\n");
// Generate
const gen = startObservation(
"answer-generation",
{
model: "gpt-4o-mini",
modelParameters: { temperature: 0.3 },
input: { question, context_length: context.length },
},
{ asType: "generation" }
);
const response = await abv.gateway.chat.completions.create({
provider: "openai",
model: "gpt-4o-mini",
messages: [
{
role: "system",
content: `Answer based on this context:\n${context}`,
},
{ role: "user", content: question },
],
temperature: 0.3,
});
const answer = response.choices[0].message.content;
gen
.update({
output: { content: answer },
usageDetails: {
input: response.usage?.prompt_tokens,
output: response.usage?.completion_tokens,
},
})
.end();
abv.score.activeTrace({ name: "retrieval_relevance", value: 0.85 });
return {
answer,
sources: docs.map((d) => d.title),
contextUsed: context.length,
};
});
}
}
// Usage
const documents = [
{ title: "Doc 1", content: "Python is a programming language..." },
{ title: "Doc 2", content: "Machine learning involves..." },
{ title: "Doc 3", content: "Data structures are..." },
];
const rag = new RAGPipeline(documents);
await rag.embedDocuments();
const result = await rag.query("user-123", "What is Python?");
Agent with Tools
Multi-step agent with tool calls and full observability.- Python
- JavaScript
Copy
from abvdev import ABV, observe
from typing import Dict, Any, Callable
import json
abv = ABV(api_key="sk-abv-...")
class Agent:
def __init__(self, tools: Dict[str, Callable]):
self.tools = tools
@observe(as_type="tool")
def execute_tool(self, tool_name: str, args: Dict[str, Any]) -> Any:
"""Execute a tool and trace it."""
abv.update_current_span(
input={"tool": tool_name, "args": args},
metadata={"tool_name": tool_name}
)
if tool_name not in self.tools:
raise ValueError(f"Unknown tool: {tool_name}")
result = self.tools[tool_name](**args)
abv.update_current_span(output=result)
return result
@observe(as_type="agent")
def run(self, user_id: str, task: str, max_steps: int = 5) -> str:
"""Run agent loop."""
abv.update_current_trace(
user_id=user_id,
tags=["agent", "production"],
metadata={"max_steps": max_steps}
)
messages = [
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": task}
]
for step in range(max_steps):
with abv.start_as_current_span(name=f"step-{step+1}") as step_span:
step_span.update(metadata={"step": step + 1})
# Get next action from LLM
with abv.start_as_current_generation(name="plan", model="gpt-4o") as gen:
response = abv.gateway.complete_chat(
provider="openai",
model="gpt-4o",
messages=messages,
temperature=0
)
content = response.choices[0].message.content
gen.update(output=content)
# Check if agent is done
if "FINAL ANSWER:" in content:
final_answer = content.split("FINAL ANSWER:")[-1].strip()
abv.score_current_trace(name="steps_taken", value=step + 1, data_type="NUMERIC")
abv.score_current_trace(name="completed", value=1.0, data_type="BOOLEAN")
return final_answer
# Parse and execute tool call
try:
tool_call = self._parse_tool_call(content)
result = self.execute_tool(tool_call["tool"], tool_call["args"])
messages.append({"role": "assistant", "content": content})
messages.append({"role": "user", "content": f"Tool result: {result}"})
except Exception as e:
messages.append({"role": "user", "content": f"Error: {e}"})
abv.score_current_trace(name="completed", value=0.0, data_type="BOOLEAN")
return "Max steps reached without completion."
def _get_system_prompt(self) -> str:
tool_descriptions = "\n".join([f"- {name}" for name in self.tools.keys()])
return f"""You are an agent that can use tools.
Available tools:
{tool_descriptions}
To use a tool, respond with:
TOOL: tool_name
ARGS: {{"arg1": "value1"}}
When you have the final answer, respond with:
FINAL ANSWER: your answer"""
def _parse_tool_call(self, content: str) -> Dict:
lines = content.strip().split("\n")
tool_name = None
args = {}
for line in lines:
if line.startswith("TOOL:"):
tool_name = line.replace("TOOL:", "").strip()
elif line.startswith("ARGS:"):
args = json.loads(line.replace("ARGS:", "").strip())
return {"tool": tool_name, "args": args}
# Define tools
def search_web(query: str) -> str:
return f"Search results for '{query}': [Result 1, Result 2]"
def calculate(expression: str) -> str:
return str(eval(expression))
# Usage
agent = Agent(tools={"search_web": search_web, "calculate": calculate})
result = agent.run("user-123", "What is 2 + 2, then search for Python tutorials")
Copy
import {
observe,
startActiveObservation,
startObservation,
updateActiveTrace,
updateActiveObservation,
} from "@abvdev/tracing";
type Tool = (...args: any[]) => any;
class Agent {
private tools: Map<string, Tool>;
constructor(tools: Record<string, Tool>) {
this.tools = new Map(Object.entries(tools));
}
executeTool = observe(
async (toolName: string, args: Record<string, any>) => {
updateActiveObservation({
input: { tool: toolName, args },
metadata: { tool_name: toolName },
asType: "tool",
});
const tool = this.tools.get(toolName);
if (!tool) {
throw new Error(`Unknown tool: ${toolName}`);
}
const result = await tool(args);
updateActiveObservation({ output: result });
return result;
},
{ name: "execute-tool", asType: "tool" }
);
async run(userId: string, task: string, maxSteps: number = 5): Promise<string> {
return startActiveObservation(
"agent-run",
async () => {
updateActiveTrace({
userId,
tags: ["agent", "production"],
metadata: { max_steps: maxSteps },
});
const messages = [
{ role: "system", content: this.getSystemPrompt() },
{ role: "user", content: task },
];
for (let step = 0; step < maxSteps; step++) {
const stepResult = await startActiveObservation(
`step-${step + 1}`,
async (stepSpan) => {
stepSpan.update({ metadata: { step: step + 1 } });
// Get next action
const gen = startObservation(
"plan",
{ model: "gpt-4o" },
{ asType: "generation" }
);
const response = await abv.gateway.chat.completions.create({
provider: "openai",
model: "gpt-4o",
messages,
temperature: 0,
});
const content = response.choices[0].message.content;
gen.update({ output: { content } }).end();
// Check if done
if (content.includes("FINAL ANSWER:")) {
const finalAnswer = content.split("FINAL ANSWER:")[1].trim();
abv.score.activeTrace({ name: "steps_taken", value: step + 1 });
abv.score.activeTrace({ name: "completed", value: 1.0 });
return { done: true, answer: finalAnswer };
}
// Execute tool
try {
const toolCall = this.parseToolCall(content);
const result = await this.executeTool(
toolCall.tool,
toolCall.args
);
messages.push({ role: "assistant", content });
messages.push({ role: "user", content: `Tool result: ${result}` });
} catch (e) {
messages.push({ role: "user", content: `Error: ${e}` });
}
return { done: false };
}
);
if (stepResult.done) {
return stepResult.answer;
}
}
abv.score.activeTrace({ name: "completed", value: 0.0 });
return "Max steps reached without completion.";
},
{ asType: "agent" }
);
}
private getSystemPrompt(): string {
const toolNames = [...this.tools.keys()].join(", ");
return `You are an agent. Tools: ${toolNames}
To use: TOOL: name\\nARGS: {"arg": "val"}
When done: FINAL ANSWER: answer`;
}
private parseToolCall(content: string): { tool: string; args: any } {
const toolMatch = content.match(/TOOL:\s*(\w+)/);
const argsMatch = content.match(/ARGS:\s*({.*})/);
return {
tool: toolMatch?.[1] ?? "",
args: argsMatch ? JSON.parse(argsMatch[1]) : {},
};
}
}
// Usage
const agent = new Agent({
search_web: ({ query }: { query: string }) =>
`Results for '${query}': [R1, R2]`,
calculate: ({ expression }: { expression: string }) =>
String(eval(expression)),
});
const result = await agent.run(
"user-123",
"What is 2 + 2, then search for Python tutorials"
);
Error Recovery Pattern
Graceful degradation with retries and fallbacks.- Python
- JavaScript
Copy
from abvdev import ABV, observe
import time
from typing import Optional
abv = ABV(api_key="sk-abv-...")
class ResilientGenerator:
def __init__(self, max_retries: int = 3, backoff_factor: float = 1.5):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
@observe()
def generate(self, prompt: str, user_id: str) -> Optional[str]:
abv.update_current_trace(
user_id=user_id,
tags=["resilient", "production"]
)
last_error = None
for attempt in range(self.max_retries):
with abv.start_as_current_span(name=f"attempt-{attempt+1}") as span:
span.update(metadata={"attempt": attempt + 1})
try:
response = abv.gateway.complete_chat(
provider="openai",
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
# Validate response
if len(result.strip()) < 10:
raise ValueError("Response too short")
abv.score_current_trace(name="retries_needed", value=attempt, data_type="NUMERIC")
abv.score_current_trace(name="success", value=1.0, data_type="BOOLEAN")
return result
except Exception as e:
last_error = e
span.update(level="WARNING", status_message=str(e))
if attempt < self.max_retries - 1:
sleep_time = self.backoff_factor ** attempt
time.sleep(sleep_time)
# All retries failed - try fallback
abv.score_current_trace(name="success", value=0.0, data_type="BOOLEAN")
abv.score_current_trace(name="used_fallback", value=1.0, data_type="BOOLEAN")
return self._fallback_response(prompt, last_error)
def _fallback_response(self, prompt: str, error: Exception) -> str:
with abv.start_as_current_span(name="fallback") as span:
span.update(
level="WARNING",
metadata={"original_error": str(error)}
)
# Try a simpler/cheaper model
try:
response = abv.gateway.complete_chat(
provider="openai",
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except:
return "I'm sorry, I couldn't process your request. Please try again later."
# Usage
generator = ResilientGenerator(max_retries=3)
result = generator.generate("Explain quantum computing", "user-123")
Copy
import { startActiveObservation, startObservation, updateActiveTrace } from "@abvdev/tracing";
class ResilientGenerator {
private maxRetries: number;
private backoffFactor: number;
constructor(maxRetries: number = 3, backoffFactor: number = 1.5) {
this.maxRetries = maxRetries;
this.backoffFactor = backoffFactor;
}
async generate(prompt: string, userId: string): Promise<string> {
return startActiveObservation("resilient-generate", async () => {
updateActiveTrace({
userId,
tags: ["resilient", "production"],
});
let lastError: Error | null = null;
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
const result = await startActiveObservation(
`attempt-${attempt + 1}`,
async (span) => {
span.update({ metadata: { attempt: attempt + 1 } });
try {
const response = await abv.gateway.chat.completions.create({
provider: "openai",
model: "gpt-4o-mini",
messages: [{ role: "user", content: prompt }],
});
const content = response.choices[0].message.content;
if (content.trim().length < 10) {
throw new Error("Response too short");
}
abv.score.activeTrace({ name: "retries_needed", value: attempt });
abv.score.activeTrace({ name: "success", value: 1.0 });
return { success: true, content };
} catch (error) {
lastError = error as Error;
span.update({ level: "WARNING", statusMessage: error.message });
if (attempt < this.maxRetries - 1) {
const sleepTime = this.backoffFactor ** attempt * 1000;
await new Promise((r) => setTimeout(r, sleepTime));
}
return { success: false };
}
}
);
if (result.success) {
return result.content;
}
}
// Fallback
abv.score.activeTrace({ name: "success", value: 0.0 });
abv.score.activeTrace({ name: "used_fallback", value: 1.0 });
return this.fallbackResponse(prompt, lastError!);
});
}
private async fallbackResponse(
prompt: string,
error: Error
): Promise<string> {
const fallback = startObservation(
"fallback",
{ metadata: { original_error: error.message } },
{}
);
fallback.update({ level: "WARNING" });
try {
const response = await abv.gateway.chat.completions.create({
provider: "openai",
model: "gpt-3.5-turbo",
messages: [{ role: "user", content: prompt }],
});
const content = response.choices[0].message.content;
fallback.end();
return content;
} catch {
fallback.end();
return "I'm sorry, I couldn't process your request. Please try again later.";
}
}
}
// Usage
const generator = new ResilientGenerator(3);
const result = await generator.generate("Explain quantum computing", "user-123");