This guide demonstrates comprehensive observability patterns for AI pipelines using Logfire. Every component is instrumented to provide deep insights into performance, costs, and quality.
- Auto-Instrumentation
- Custom Instrumentation
- Structured Logging
- Distributed Tracing
- SQL Queries
- Dashboards
- Alerts
Logfire automatically captures detailed telemetry from popular frameworks and libraries.
# Enable auto-instrumentation in observability.py
import logfire
logfire.instrument_openai() # All OpenAI API calls
logfire.instrument_anthropic() # All Anthropic API calls
logfire.instrument_asyncpg() # All database queries
logfire.instrument_httpx() # All HTTP client requests
logfire.instrument_fastapi(app) # All FastAPI endpointsNote: Anthropic instrumentation includes graceful error handling for version compatibility. Even if auto-instrumentation fails, Anthropic API calls will still work normally.
- Request/response bodies and headers - Full HTTP request/response data
- Token counts and costs - Automatic cost calculation for LLM calls
- SQL query text and execution time - Database performance monitoring
- HTTP status codes and latencies - API performance tracking
- Error traces with full context - Stack traces and error details
When you call OpenAI's API, Logfire automatically captures:
# Your code
embedding = await openai.embeddings.create(
model="text-embedding-3-small",
input=question
)
# Logfire automatically tracks:
# - Request: model, input, parameters
# - Response: embedding vector, usage stats
# - Metadata: latency, cost, token count
# - Context: parent span, session ID, user infoBeyond auto-instrumentation, add business-specific observability.
Use the @logfire.instrument decorator to track pipeline stages:
@logfire.instrument("embed_question")
async def embed_question(text: str) -> List[float]:
with logfire.span("openai_embedding") as span:
span.set_attribute("text_length", len(text))
span.set_attribute("model", "text-embedding-3-small")
embedding = await openai.embeddings.create(...)
span.set_attribute("cost_usd", calculate_cost(...))
span.set_attribute("success", True)
return embeddingTrack end-to-end user journeys:
with logfire.span(
"user_session.qa_request",
session_id=session_id,
question=question[:100],
user_id=user_id
):
# All pipeline stages become children of this span
# Enables complete user journey tracking
result = await execute_pipeline(question)
logfire.info(
"Session completed",
total_cost_usd=result.cost,
quality_score=result.quality_score,
iterations=result.iterations
)Track business-specific KPIs:
# Track cost distribution
logfire.metric("pipeline.cost.total", value=cost_usd)
logfire.metric("pipeline.cost.by_stage",
value=stage_cost,
stage=stage_name
)
# Track quality scores
logfire.metric("pipeline.quality_score", value=score)
logfire.metric("pipeline.quality_score.by_category",
value=score,
category=question_category
)
# Track iteration patterns
logfire.metric("pipeline.iterations", value=iterations)
logfire.metric("pipeline.first_pass_rate",
value=1 if iterations == 1 else 0
)Capture errors with context:
try:
result = await generate_answer(question, context)
except Exception as e:
logfire.error(
"Answer generation failed",
error=str(e),
error_type=type(e).__name__,
question_length=len(question),
context_length=len(context),
attempt=attempt_number
)
raiseRich attributes enable powerful queries and analysis.
# Good: Rich structured data
logfire.info(
"Pipeline execution completed",
question_length=len(question),
total_cost_usd=total_cost,
duration_ms=duration,
quality_score=quality_score,
iterations=iterations,
passed_first_iteration=iterations == 1,
session_id=session_id,
stage_costs={
"embedding": embedding_cost,
"generation": generation_cost,
"evaluation": eval_cost
}
)
# Bad: Unstructured string
logfire.info(f"Pipeline completed in {duration}ms with score {quality_score}")Track these attributes consistently across your pipeline:
- Identifiers:
session_id,request_id,user_id - Timing:
duration_ms,start_time,end_time - Costs:
cost_usd,total_cost_usd,cost_by_stage - Quality:
quality_score,accuracy_score,agreement_level - Metadata:
model_name,token_count,iteration_number
Every request creates a trace hierarchy showing parent-child relationships.
user_session.qa_request [4.2s, $0.08]
├── qa_pipeline [4.1s, $0.08]
│ ├── question_embedding [0.1s, $0.0002]
│ │ └── openai.embeddings.create [0.09s, $0.0002]
│ ├── rag_retrieval [0.3s, $0.0001]
│ │ ├── hybrid_search [0.2s]
│ │ └── postgres.query [0.18s]
│ ├── answer_generation [2.1s, $0.045]
│ │ └── anthropic.messages.create [2.0s, $0.045]
│ ├── claims_extraction [0.4s, $0.008]
│ │ └── openai.chat.completions [0.38s, $0.008]
│ ├── claims_verification [0.5s, $0.0015]
│ ├── technical_accuracy [0.6s, $0.018]
│ │ └── anthropic.messages.create [0.58s, $0.018]
│ ├── quality_evaluation [0.3s, $0.007]
│ └── quality_gate [0.01s, $0]
- Parent-child relationships - How stages compose
- Time spent in each stage - Where time is spent
- Cost attribution - Where money is spent
- Bottleneck identification - Which stages are slow
Logfire stores all telemetry in a queryable database. Use SQL to analyze patterns.
SELECT
session_id,
question_length,
total_cost_usd,
iterations
FROM logs
WHERE total_cost_usd > 0.10
ORDER BY total_cost_usd DESC
LIMIT 10;SELECT
DATE(timestamp),
AVG(quality_score),
COUNT(*)
FROM logs
WHERE event_type = 'Pipeline execution completed'
GROUP BY DATE(timestamp)
HAVING AVG(quality_score) < 85;SELECT
iterations,
COUNT(*) as request_count,
AVG(quality_score) as avg_quality,
AVG(total_cost_usd) as avg_cost
FROM logs
WHERE event_type = 'Pipeline execution completed'
GROUP BY iterations
ORDER BY iterations;SELECT
DATE(timestamp) as date,
COUNT(*) as total_requests,
SUM(total_cost_usd) as total_cost,
AVG(total_cost_usd) as avg_cost_per_request,
SUM(CASE WHEN iterations = 1 THEN 1 ELSE 0 END)::FLOAT / COUNT(*) * 100 as first_pass_rate
FROM logs
WHERE event_type = 'Pipeline execution completed'
GROUP BY DATE(timestamp)
ORDER BY date DESC;SELECT
DATE(timestamp) as date,
AVG(duration_ms) as avg_duration,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95_duration,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY duration_ms) as p99_duration
FROM logs
WHERE event_type = 'Pipeline execution completed'
GROUP BY DATE(timestamp)
ORDER BY date DESC;Create custom dashboards to visualize key metrics.
What it shows:
- Real-time request volume
- Average response time
- Cost per request
- Success/failure rates
- Current iteration distribution
SQL Query:
SELECT
DATE_TRUNC('hour', timestamp) as hour,
COUNT(*) as total_requests,
AVG(duration_ms) as avg_duration,
SUM(cost_usd) as total_cost,
AVG(quality_score) as avg_quality
FROM pipeline_executions
WHERE timestamp > NOW() - INTERVAL '24 hours'
GROUP BY hour
ORDER BY hour DESC;What it shows:
- Cost breakdown by stage
- Cost per question type
- Model comparison (OpenAI vs Anthropic)
- Cost trends over time
- Budget tracking
SQL Query:
SELECT
stage_name,
COUNT(*) as executions,
SUM(cost_usd) as total_cost,
AVG(cost_usd) as avg_cost,
SUM(cost_usd) / (SELECT SUM(cost_usd) FROM pipeline_stages) * 100 as pct_of_total
FROM pipeline_stages
WHERE DATE(timestamp) = CURRENT_DATE
GROUP BY stage_name
ORDER BY total_cost DESC;What it shows:
- Quality score distribution
- Evaluator agreement rates
- Iteration patterns
- Question type performance
- Failure analysis
SQL Query:
SELECT
question_category,
COUNT(*) as questions,
AVG(quality_score) as avg_score,
AVG(openai_score - anthropic_score) as avg_disagreement,
SUM(CASE WHEN iterations > 1 THEN 1 ELSE 0 END)::FLOAT / COUNT(*) * 100 as iteration_rate
FROM pipeline_executions
WHERE timestamp > NOW() - INTERVAL '7 days'
GROUP BY question_category
ORDER BY iteration_rate DESC;Configure alerts to catch issues proactively.
Trigger: Cost per request > $0.15
Action: Investigate expensive queries
SQL Query:
SELECT COUNT(*) as high_cost_requests
FROM logs
WHERE
event_type = 'Pipeline execution completed'
AND timestamp > NOW() - INTERVAL '1 hour'
AND total_cost_usd > 0.15;Trigger: Quality score < 75 for > 10% of requests
Action: Review recent answers
SQL Query:
SELECT
COUNT(*) as total_requests,
SUM(CASE WHEN quality_score < 75 THEN 1 ELSE 0 END) as low_quality_requests,
SUM(CASE WHEN quality_score < 75 THEN 1 ELSE 0 END)::FLOAT / COUNT(*) * 100 as low_quality_pct
FROM logs
WHERE
event_type = 'Pipeline execution completed'
AND timestamp > NOW() - INTERVAL '1 hour'
HAVING low_quality_pct > 10;Trigger: Iteration rate > 25%
Action: Check RAG document quality
SQL Query:
SELECT
COUNT(*) as total_requests,
SUM(CASE WHEN iterations > 1 THEN 1 ELSE 0 END) as multi_iteration_requests,
SUM(CASE WHEN iterations > 1 THEN 1 ELSE 0 END)::FLOAT / COUNT(*) * 100 as iteration_rate
FROM logs
WHERE
event_type = 'Pipeline execution completed'
AND timestamp > NOW() - INTERVAL '1 hour'
HAVING iteration_rate > 25;Trigger: P95 latency > 10 seconds
Action: Optimize slow stages
SQL Query:
SELECT PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95_duration
FROM logs
WHERE
event_type = 'Pipeline execution completed'
AND timestamp > NOW() - INTERVAL '1 hour'
HAVING p95_duration > 10000;- Pipeline Guide - Detailed pipeline stages
- Configuration Guide - All configuration options
- Logfire Documentation - Official Logfire docs