python-cqrs-core¶
CQRS interfaces and base classes with built-in tracing and audit fields.
Installation¶
pip install python-cqrs-core
Public API¶
Class |
Purpose |
|---|---|
|
Write operation interface |
|
Command handler with |
|
Read operation interface |
|
Query handler with |
|
Pydantic command with |
|
Pydantic query with tracing fields |
|
Query with |
Guides¶
Usage Guide¶
Complete guide to using Python CQRS Core in your applications.
Commands¶
Commands represent write operations that modify state.
from python_cqrs_core import BaseCommand, ICommandHandler
class CreateUserCommand(BaseCommand):
name: str
email: str
class CreateUserHandler(ICommandHandler[CreateUserCommand, int]):
async def handle(self, command: CreateUserCommand) -> int:
# Create user logic
user_id = await self.user_repository.create(
name=command.name,
email=command.email
)
return user_id
# Usage
cmd = CreateUserCommand(
name="John Doe",
email="john@example.com",
requested_by="admin"
)
handler = CreateUserHandler()
user_id = await handler.handle(cmd)
Queries¶
Queries represent read operations that fetch state.
from python_cqrs_core import BaseQuery, IQueryHandler
class GetUserQuery(BaseQuery):
user_id: int
class GetUserHandler(IQueryHandler[GetUserQuery, User]):
async def handle(self, query: GetUserQuery) -> User:
# Fetch user logic
return await self.user_repository.get(query.user_id)
# Usage
query = GetUserQuery(user_id=1, requested_by="admin")
handler = GetUserHandler()
user = await handler.handle(query)
Paginated Queries¶
Queries with built-in pagination support.
from python_cqrs_core import PaginatedQuery
class ListUsersQuery(PaginatedQuery):
status: str = "active"
query = ListUsersQuery(page=2, page_size=20, status="active")
offset = query.offset # 20
limit = query.page_size # 20
# Use in database query
users = await db.query(User).offset(query.offset).limit(query.page_size).all()
Tracing Fields¶
All commands and queries include built-in tracing fields for observability:
cmd = CreateUserCommand(name="John", email="john@example.com")
print(cmd.request_id) # UUID
print(cmd.correlation_id) # Optional[UUID]
print(cmd.requested_by) # Optional[str]
print(cmd.requested_at) # datetime
Field Details¶
request_id¶
Type:
UUIDAuto-generated: Yes
Purpose: Unique identifier for this specific command/query
Usage: Track the request through logs and traces
correlation_id¶
Type:
Optional[UUID]Default:
NonePurpose: Link related operations across services
Usage: Set to another command’s
request_idto create a trace chain
# Initial command
save_cmd = SaveStateCommand(token="abc123", entity_id=1)
await handler.handle(save_cmd)
# Related command - link them
finalize_cmd = FinalizeSessionCommand(
token="abc123",
correlation_id=save_cmd.request_id # Link to previous operation
)
await handler.handle(finalize_cmd)
requested_by¶
Type:
Optional[str]Default:
NonePurpose: Identify who initiated the operation
Usage: Audit trail, security, logging
cmd = CreateUserCommand(
name="Jane",
email="jane@example.com",
requested_by="admin@example.com" # Audit trail
)
requested_at¶
Type:
datetimeAuto-generated: Yes (UTC)
Purpose: Timestamp when command/query was created
Usage: Performance monitoring, audit trail
cmd = CreateUserCommand(name="John", email="john@example.com")
duration = (datetime.now(timezone.utc) - cmd.requested_at).total_seconds()
logger.info(f"Command processing took {duration}s")
Complete Example¶
Here’s a complete example showing commands, queries, and handlers working together:
from python_cqrs_core import BaseCommand, BaseQuery, ICommandHandler, IQueryHandler
from datetime import datetime, timezone
import logging
logger = logging.getLogger(__name__)
# Domain Model
class User:
def __init__(self, id: int, name: str, email: str):
self.id = id
self.name = name
self.email = email
# Command
class CreateUserCommand(BaseCommand):
name: str
email: str
class CreateUserHandler(ICommandHandler[CreateUserCommand, int]):
def __init__(self, repository):
self.repository = repository
async def handle(self, command: CreateUserCommand) -> int:
logger.info(
f"[{command.request_id}] Creating user {command.email} "
f"requested by {command.requested_by} at {command.requested_at}"
)
user_id = await self.repository.create(
name=command.name,
email=command.email
)
duration = (datetime.now(timezone.utc) - command.requested_at).total_seconds()
logger.info(f"[{command.request_id}] User created in {duration}s")
return user_id
# Query
class GetUserQuery(BaseQuery):
user_id: int
class GetUserHandler(IQueryHandler[GetUserQuery, User]):
def __init__(self, repository):
self.repository = repository
async def handle(self, query: GetUserQuery) -> User:
logger.info(
f"[{query.request_id}] Fetching user {query.user_id} "
f"requested by {query.requested_by}"
)
user = await self.repository.get(query.user_id)
return user
# Usage
async def main():
# Create user
create_cmd = CreateUserCommand(
name="John Doe",
email="john@example.com",
requested_by="admin@example.com"
)
create_handler = CreateUserHandler(user_repository)
user_id = await create_handler.handle(create_cmd)
# Fetch user (linked via correlation_id)
get_query = GetUserQuery(
user_id=user_id,
correlation_id=create_cmd.request_id, # Link to create operation
requested_by="admin@example.com"
)
get_handler = GetUserHandler(user_repository)
user = await get_handler.handle(get_query)
print(f"Created and fetched user: {user.name}")
Best Practices¶
Always Set requested_by¶
# ✅ Good
cmd = CreateUserCommand(
name="John",
email="john@example.com",
requested_by="admin@example.com"
)
# ❌ Bad (no audit trail)
cmd = CreateUserCommand(
name="John",
email="john@example.com"
)
Log with request_id¶
async def handle(self, command: CreateUserCommand) -> int:
logger.info(f"[{command.request_id}] Starting user creation")
try:
user_id = await self.repository.create(command)
logger.info(f"[{command.request_id}] User created successfully")
return user_id
except Exception as e:
logger.error(f"[{command.request_id}] Failed to create user: {e}")
raise
Track Performance¶
async def handle(self, query: GetUserQuery) -> User:
start = query.requested_at
user = await self.repository.get(query.user_id)
duration = (datetime.now(timezone.utc) - start).total_seconds()
metrics.record_query_duration("GetUser", duration)
if duration > 1.0:
logger.warning(
f"[{query.request_id}] Slow query: GetUser "
f"took {duration}s for user_id={query.user_id}"
)
return user
Next Steps¶
See Why Use BaseQuery/BaseCommand? for design rationale
See Observability Integration for OpenTelemetry/Sentry/Prometheus
See API Reference for complete API documentation
API Reference¶
Interfaces¶
ICommand¶
Marker interface for commands (write operations).
Commands represent intentions to change state in the system.
Usage:
from python_cqrs_core import ICommand
class MyCommand(ICommand):
pass
ICommandHandler[TCommand, TResult]¶
Handler interface for commands.
Type Parameters:
TCommand: The command type this handler processesTResult: The return type of the handler
Methods:
async handle(command: TCommand) -> TResult¶
Process the command and return a result.
Parameters:
command: The command to process
Returns:
Result of the command execution
Example:
from python_cqrs_core import ICommandHandler, BaseCommand
class CreateUserCommand(BaseCommand):
name: str
email: str
class CreateUserHandler(ICommandHandler[CreateUserCommand, int]):
async def handle(self, command: CreateUserCommand) -> int:
# Create user logic
user_id = await self.user_repository.create(
name=command.name,
email=command.email
)
return user_id
IQuery¶
Marker interface for queries (read operations).
Queries represent requests to read state from the system.
Usage:
from python_cqrs_core import IQuery
class MyQuery(IQuery):
pass
IQueryHandler[TQuery, TResult]¶
Handler interface for queries.
Type Parameters:
TQuery: The query type this handler processesTResult: The return type of the handler
Methods:
async handle(query: TQuery) -> TResult¶
Process the query and return a result.
Parameters:
query: The query to process
Returns:
Result of the query execution
Example:
from python_cqrs_core import IQueryHandler, BaseQuery
class GetUserQuery(BaseQuery):
user_id: int
class GetUserHandler(IQueryHandler[GetUserQuery, User]):
async def handle(self, query: GetUserQuery) -> User:
return await self.user_repository.get(query.user_id)
Base Classes¶
BaseCommand¶
Base command with tracing and audit fields.
All commands should extend this class to gain automatic observability features.
Inheritance:
BaseCommand(BaseModel, ICommand)
Fields:
request_id: UUID¶
Type:
UUIDDefault: Auto-generated via
uuid4()Description: Unique request identifier for this command
Usage: Track this specific command execution through logs and traces
correlation_id: Optional[UUID]¶
Type:
Optional[UUID]Default:
NoneDescription: Correlation ID for distributed tracing
Usage: Link related commands/queries across service boundaries
requested_by: Optional[str]¶
Type:
Optional[str]Default:
NoneDescription: User or system that initiated the command
Usage: Audit trail - track who performed the action
requested_at: datetime¶
Type:
datetimeDefault: Auto-generated via
datetime.now(timezone.utc)Description: Timestamp when command was created
Usage: Audit trail and performance monitoring
Configuration:
frozen=True: Commands are immutable after creation
Example:
from python_cqrs_core import BaseCommand
class CreateUserCommand(BaseCommand):
name: str
email: str
# Usage
cmd = CreateUserCommand(
name="John Doe",
email="john@example.com",
requested_by="admin@example.com"
)
print(cmd.request_id) # UUID('550e8400-e29b-41d4-a716-446655440000')
print(cmd.requested_by) # "admin@example.com"
print(cmd.requested_at) # datetime(2026, 2, 26, 10, 30, 45, tzinfo=timezone.utc)
BaseQuery¶
Base query with tracing fields.
All queries should extend this class to gain automatic observability features.
Inheritance:
BaseQuery(BaseModel, IQuery)
Fields:
Same as BaseCommand:
request_id: UUIDcorrelation_id: Optional[UUID]requested_by: Optional[str]requested_at: datetime
Configuration:
frozen=True: Queries are immutable after creation
Example:
from python_cqrs_core import BaseQuery
class GetUserQuery(BaseQuery):
user_id: int
# Usage
query = GetUserQuery(
user_id=123,
requested_by="john@example.com"
)
print(query.request_id) # UUID('abc12345-...')
print(query.user_id) # 123
PaginatedQuery¶
Query with pagination support.
Extends BaseQuery with pagination fields and automatic offset calculation.
Inheritance:
PaginatedQuery(BaseQuery)
Fields:
All BaseQuery fields, plus:
page: int¶
Type:
intDefault:
1Constraints:
>= 1(1-indexed)Description: Page number
Usage: Specify which page of results to retrieve
page_size: int¶
Type:
intDefault:
10Constraints:
1 <= page_size <= 100Description: Items per page
Usage: Control result set size (max 100 items)
Properties:
offset: int¶
Calculated offset for database queries.
Formula: (page - 1) * page_size
Returns:
int: Offset value to use in database LIMIT/OFFSET queries
Example:
from python_cqrs_core import PaginatedQuery
class ListUsersQuery(PaginatedQuery):
status: str = "active"
# Page 1
query = ListUsersQuery(page=1, page_size=20)
print(query.offset) # 0
# Page 3
query = ListUsersQuery(page=3, page_size=20)
print(query.offset) # 40
# Use in database query
users = await db.query(User)\
.offset(query.offset)\
.limit(query.page_size)\
.all()
Type Hints¶
All interfaces and base classes support full generic type hints:
from python_cqrs_core import IQueryHandler, BaseQuery
from typing import List
class ListUsersQuery(BaseQuery):
status: str
class User:
id: int
name: str
# Handler with proper type hints
class ListUsersHandler(IQueryHandler[ListUsersQuery, List[User]]):
async def handle(self, query: ListUsersQuery) -> List[User]:
# Type checker knows:
# - query is ListUsersQuery
# - Must return List[User]
return await self.repo.list(status=query.status)
Immutability¶
All queries and commands are immutable (frozen) after creation:
query = GetUserQuery(user_id=123)
query.user_id = 456 # ❌ Raises: FrozenInstanceError
# Must create new instance
new_query = GetUserQuery(user_id=456) # ✅ OK
This ensures that queries/commands cannot be modified after creation, providing:
Thread safety
Predictable behavior
Prevention of accidental mutations in handlers
Observability Integration¶
Overview¶
BaseQuery and BaseCommand are designed to work seamlessly with modern observability tools like OpenTelemetry, Sentry, and Prometheus. They provide the crucial business context layer that complements infrastructure tracing.
Key Insight: Complementary, Not Redundant¶
Infrastructure Tracing vs Domain Tracing¶
Aspect |
OpenTelemetry/Sentry/Prometheus |
BaseQuery/BaseCommand |
|---|---|---|
Purpose |
Technical/infrastructure tracing |
Business/domain context |
What it tracks |
HTTP calls, DB queries, spans, metrics |
Who, what, when at domain level |
IDs |
Opaque trace IDs ( |
Business IDs ( |
Searchability |
By trace ID, span name |
By user, email, business entity |
Compliance |
Must add explicit audit events |
Built-in audit trail |
Best for |
“What’s slow?”, “Where did it fail?” |
“Who did this?”, “What business operation?” |
Bottom line: Infrastructure tools show HOW your system works. Your BaseQuery/BaseCommand fields provide WHO and WHAT at the business level.
Integration Pattern: The Bridge¶
Your BaseQuery/BaseCommand fields become the attributes you attach to OpenTelemetry spans and Sentry events.
Domain Layer (Your Code)
↓ BaseQuery/BaseCommand with business fields
↓
Handler Layer
↓ Attach business fields to OTel span attributes
↓
Infrastructure Layer (OpenTelemetry)
↓ Technical tracing (HTTP, DB, cache, etc.)
↓
Observability Tools (Sentry/Prometheus/Jaeger)
↓ Unified view: Business context + Technical trace
OpenTelemetry Integration¶
Setup¶
pip install opentelemetry-api opentelemetry-sdk
Pattern 1: Attach Command/Query Context to Spans¶
Create a helper to bridge your domain fields to OpenTelemetry:
from opentelemetry import trace
from python_cqrs_core import BaseCommand, BaseQuery
from typing import Union
def attach_cqrs_context(
span: trace.Span,
cqrs_object: Union[BaseCommand, BaseQuery]
):
"""Attach BaseCommand/BaseQuery fields to OpenTelemetry span."""
span.set_attribute("business.request_id", str(cqrs_object.request_id))
if cqrs_object.correlation_id:
span.set_attribute("business.correlation_id", str(cqrs_object.correlation_id))
if cqrs_object.requested_by:
span.set_attribute("requested_by", cqrs_object.requested_by)
span.set_attribute("requested_at", cqrs_object.requested_at.isoformat())
# Determine if it's a command or query
span.set_attribute(
"cqrs.type",
"command" if isinstance(cqrs_object, BaseCommand) else "query"
)
span.set_attribute("cqrs.name", type(cqrs_object).__name__)
Pattern 2: Use in Your Handlers¶
from opentelemetry import trace
class CreateInviteCommandHandler:
def __init__(self, repository):
self.repository = repository
self.tracer = trace.get_tracer("cqrs.write", "1.0.0")
async def handle(self, command: CreateInviteCommand):
with self.tracer.start_as_current_span("CreateInvite") as span:
# Attach your domain context
attach_cqrs_context(span, command)
# Add business-specific attributes
span.set_attribute("invite.email", command.email)
span.set_attribute("invite.expires_in_days", command.expires_in_days)
# Execute business logic
result = await self.repository.create(command)
# Add result attributes
span.set_attribute("result.invite_id", result.id)
span.set_attribute("result.token", result.token)
return result
Pattern 3: Propagate Correlation IDs via Baggage¶
Use OpenTelemetry baggage to propagate your correlation IDs downstream:
from opentelemetry import baggage, context
async def handle(self, command: CreateInviteCommand):
# Set correlation_id in baggage for downstream services
correlation_id = str(command.correlation_id or command.request_id)
ctx = baggage.set_baggage("correlation.id", correlation_id)
with self.tracer.start_as_current_span("CreateInvite", context=ctx) as span:
attach_cqrs_context(span, command)
# All downstream services get correlation.id automatically!
result = await self.repository.create(command)
return result
Pattern 4: Separate Command and Query Tracers (Best Practice 2026)¶
Create separate tracers for read and write paths:
# In your application setup
write_tracer = trace.get_tracer("cqrs.write", "1.0.0")
read_tracer = trace.get_tracer("cqrs.read", "1.0.0")
# Command handlers use write tracer
class CreateInviteHandler:
tracer = write_tracer
# Query handlers use read tracer
class GetInviteHandler:
tracer = read_tracer
This enables independent monitoring of write-side throughput vs read-side latency.
Sentry Integration¶
Setup¶
pip install sentry-sdk opentelemetry-sdk
Pattern 1: Automatic Trace Context Correlation¶
Initialize OpenTelemetry first, then Sentry with OpenTelemetry integration:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
import sentry_sdk
from sentry_sdk.integrations.opentelemetry import OpenTelemetryIntegration
# Initialize OpenTelemetry
trace.set_tracer_provider(TracerProvider())
# Initialize Sentry with OTel integration
sentry_sdk.init(
dsn="your-dsn",
integrations=[OpenTelemetryIntegration()],
traces_sample_rate=1.0,
)
Now Sentry automatically picks up OpenTelemetry trace context, and your BaseCommand/BaseQuery fields flow through!
Pattern 2: Attach Context to Sentry Scope¶
import sentry_sdk
async def handle(self, command: CreateInviteCommand):
with sentry_sdk.configure_scope() as scope:
# Add your business context
scope.set_tag("business.request_id", str(command.request_id))
scope.set_tag("requested_by", command.requested_by)
scope.set_context("command", {
"type": type(command).__name__,
"request_id": str(command.request_id),
"correlation_id": str(command.correlation_id) if command.correlation_id else None,
"requested_by": command.requested_by,
"requested_at": command.requested_at.isoformat(),
})
# Execute command
result = await self.repository.create(command)
return result
Pattern 3: Search Sentry by Business Context¶
When errors occur, search Sentry by your domain fields:
# User reports: "My invite creation failed"
# User email: john@example.com
Search Sentry:
requested_by:"john@example.com" AND type:"CreateInviteCommand"
Result:
Error: DatabaseError
↳ business.request_id: 550e8400-e29b-41d4-a716-446655440000
↳ requested_by: john@example.com
↳ command.type: CreateInviteCommand
↳ OTel Trace ID: abc123def456...
Click trace ID → See full distributed trace!
Prometheus Integration¶
Pattern: Use request_id for Request Duration Metrics¶
from prometheus_client import Histogram
from datetime import datetime, timezone
# Define metric
command_duration = Histogram(
'cqrs_command_duration_seconds',
'Command execution duration',
['command_type', 'requested_by', 'status']
)
async def handle(self, command: CreateInviteCommand):
start = command.requested_at
status = "success"
try:
result = await self.repository.create(command)
return result
except Exception as e:
status = "error"
raise
finally:
duration = (datetime.now(timezone.utc) - start).total_seconds()
# Record metric with business context
command_duration.labels(
command_type=type(command).__name__,
requested_by=command.requested_by or "anonymous",
status=status
).observe(duration)
Pattern: Counter by User¶
from prometheus_client import Counter
command_counter = Counter(
'cqrs_commands_total',
'Total commands processed',
['command_type', 'requested_by']
)
async def handle(self, command: CreateInviteCommand):
# Increment counter with business context
command_counter.labels(
command_type=type(command).__name__,
requested_by=command.requested_by or "anonymous"
).inc()
result = await self.repository.create(command)
return result
Now in Prometheus/Grafana you can query:
# Commands per user
sum by (requested_by) (cqrs_commands_total)
# P95 latency by command type
histogram_quantile(0.95, cqrs_command_duration_seconds)
Audit Trail for Compliance (SOC 2, HIPAA, PCI DSS)¶
Using OpenTelemetry Span Events (2026 Best Practice)¶
async def handle(self, command: CreateInviteCommand):
with tracer.start_as_current_span("CreateInvite") as span:
attach_cqrs_context(span, command)
# Execute command
result = await self.repository.create(command)
# Record audit event
span.add_event(
"invite.created",
attributes={
# Compliance fields
"audit.action": "CREATE_INVITE",
"audit.actor": command.requested_by or "system",
"audit.resource_type": "invite",
"audit.resource_id": str(result.id),
"audit.timestamp": command.requested_at.isoformat(),
# Business context
"audit.business_request_id": str(command.request_id),
"audit.correlation_id": str(command.correlation_id) if command.correlation_id else None,
# Data classification
"compliance.data_classification": "PII",
"compliance.contains_email": True,
# Result
"audit.result": "success",
}
)
return result
This creates audit trail entries that:
Automatically correlate with the originating request
Are stored in your tracing backend
Support compliance requirements
Can be queried for reports
Real-World Example: Full Integration¶
Production Incident Investigation¶
Step 1: User Reports Issue
"My order ORDER-12345 failed 10 minutes ago" - john@example.com
Step 2: Search Application Logs (BaseCommand Fields)
grep "requested_by=john@example.com" logs.txt | grep "ORDER-12345"
[550e8400-...] CreateOrderCommand | order_id=ORDER-12345 |
requested_by=john@example.com | at=2026-02-26 10:20:45
Step 3: Use request_id to Find OTel Trace in Sentry
# Search Sentry for: business.request_id=550e8400-...
Step 4: See Full Distributed Trace
Trace: abc123def456...
├─ HTTP POST /orders [200ms]
├─ CreateOrderCommand [business.request_id: 550e8400] [150ms]
│ ├─ ValidatePayment [50ms] ✅
│ ├─ CheckInventory [30ms] ✅
│ └─ SaveOrder [70ms] ❌ DATABASE_ERROR
└─ HTTP Response [500] ❌
Error: Connection timeout to postgres
Sentry Context:
- requested_by: john@example.com
- business.request_id: 550e8400-...
- order.id: ORDER-12345
Result:
Found the exact user’s request via your domain fields
Saw the full technical trace via OpenTelemetry
Diagnosed root cause: Database connection issue
Without your BaseCommand fields: You’d search through 1000s of traces with no business context.
Complete Handler Example¶
Here’s a production-ready handler with full observability:
from opentelemetry import trace, baggage
from prometheus_client import Histogram, Counter
import sentry_sdk
from python_cqrs_core import BaseCommand, ICommandHandler
class CreateInviteCommand(BaseCommand):
email: str
expires_in_days: int
class CreateInviteHandler(ICommandHandler[CreateInviteCommand, InviteResult]):
def __init__(self, repository, metrics_registry):
self.repository = repository
self.tracer = trace.get_tracer("cqrs.write", "1.0.0")
# Prometheus metrics
self.duration_histogram = Histogram(
'create_invite_duration_seconds',
'CreateInvite command duration',
['requested_by', 'status']
)
self.counter = Counter(
'create_invite_total',
'Total CreateInvite commands',
['requested_by']
)
async def handle(self, command: CreateInviteCommand) -> InviteResult:
# Set up correlation
correlation_id = str(command.correlation_id or command.request_id)
ctx = baggage.set_baggage("correlation.id", correlation_id)
status = "success"
with self.tracer.start_as_current_span(
"CreateInvite",
context=ctx
) as span:
try:
# Attach business context to OTel span
span.set_attribute("business.request_id", str(command.request_id))
span.set_attribute("requested_by", command.requested_by or "anonymous")
span.set_attribute("cqrs.type", "command")
span.set_attribute("cqrs.command.type", "CreateInvite")
span.set_attribute("invite.email", command.email)
# Attach context to Sentry
with sentry_sdk.configure_scope() as scope:
scope.set_tag("business.request_id", str(command.request_id))
scope.set_tag("requested_by", command.requested_by)
scope.set_context("command", {
"type": "CreateInviteCommand",
"request_id": str(command.request_id),
"email": command.email,
})
# Increment Prometheus counter
self.counter.labels(
requested_by=command.requested_by or "anonymous"
).inc()
# Execute business logic
result = await self.repository.create(command)
# Record audit event
span.add_event(
"invite.created",
attributes={
"audit.action": "CREATE_INVITE",
"audit.actor": command.requested_by or "system",
"audit.resource_id": str(result.id),
"audit.timestamp": command.requested_at.isoformat(),
}
)
return result
except Exception as e:
status = "error"
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(e)
raise
finally:
# Record duration in Prometheus
duration = (datetime.now(timezone.utc) - command.requested_at).total_seconds()
self.duration_histogram.labels(
requested_by=command.requested_by or "anonymous",
status=status
).observe(duration)
Best Practices (2026)¶
1. Use Separate Tracers for Commands and Queries¶
# Application setup
write_tracer = trace.get_tracer("cqrs.write", "1.0.0")
read_tracer = trace.get_tracer("cqrs.read", "1.0.0")
# Command handlers
class MyCommandHandler:
tracer = write_tracer
# Query handlers
class MyQueryHandler:
tracer = read_tracer
This enables independent performance analysis of read vs write paths.
2. Propagate Business Correlation IDs¶
Always set correlation IDs in baggage:
correlation_id = str(command.correlation_id or command.request_id)
ctx = baggage.set_baggage("correlation.id", correlation_id)
with tracer.start_as_current_span("MyOperation", context=ctx):
# Downstream services automatically get correlation.id
await self.external_service.call()
3. Structure Audit Events Consistently¶
Use a consistent schema for all audit events:
AUDIT_EVENT_SCHEMA = {
"audit.action": "string", # CREATE_USER, DELETE_ORDER, etc.
"audit.actor": "string", # Who (from requested_by)
"audit.resource_type": "string", # What type (user, order, invite)
"audit.resource_id": "string", # Which one (user_id, order_id)
"audit.timestamp": "ISO8601", # When (from requested_at)
"audit.business_request_id": "UUID", # Unique identifier
"audit.result": "string", # success, denied, error
}
4. Use request_id for Log Correlation¶
Structure your logs to include request_id:
import logging
import structlog
logger = structlog.get_logger()
async def handle(self, command: CreateInviteCommand):
# Bind request_id to all logs in this context
log = logger.bind(
request_id=str(command.request_id),
requested_by=command.requested_by,
command_type=type(command).__name__
)
log.info("processing_command")
result = await self.repository.create(command)
log.info("command_completed", invite_id=result.id)
return result
Query Examples¶
Find All Operations by User¶
Application logs:
grep 'requested_by=john@example.com' app.log
Sentry:
Search: requested_by:"john@example.com"
Jaeger (OpenTelemetry):
Tags: requested_by="john@example.com"
Trace Request Flow¶
Using correlation_id:
# Find initial request
grep 'request_id=550e8400' app.log
# Find all related operations
grep 'correlation_id=550e8400' app.log
In Jaeger:
Search by baggage: correlation.id="550e8400..."
→ Shows all connected operations across services
Summary: Why Keep BaseQuery/BaseCommand with OTel/Sentry?¶
They Solve Different Problems¶
Problem |
Solution |
|---|---|
“Which request failed?” |
|
“Where in the code did it fail?” |
OpenTelemetry spans |
“Who made this request?” |
|
“What was the error?” |
Sentry error tracking |
“How long did it take?” |
Both ( |
“What’s the related operation?” |
|
Your Fields Are the Bridge¶
User Reports Issue
↓
Search by business context (requested_by, order_id)
↓ [YOUR FIELDS]
Find request_id in logs
↓
Search Sentry/Jaeger by business.request_id
↓ [INFRASTRUCTURE TOOLS]
View full distributed trace
↓
Diagnose technical root cause
Without your BaseCommand fields: Infrastructure tools have no business context.
Without infrastructure tools: Your fields have no technical details.
Together: Complete observability from business to infrastructure! 🎯
Why Use BaseQuery/BaseCommand?¶
Overview¶
While BaseQuery and BaseCommand extend Pydantic’s BaseModel, they provide critical production-ready features that go beyond basic validation.
Benefits¶
1. Zero-Effort Observability¶
Every query/command automatically gets tracing fields without manual declaration:
# Plain Pydantic (NO observability)
class GetUserQuery(BaseModel):
user_id: int
# No way to track this request through logs
# With BaseQuery (FULL observability)
class GetUserQuery(BaseQuery):
user_id: int
# Automatically includes: request_id, correlation_id, requested_by, requested_at
2. Production Debugging¶
Without BaseQuery:
# Handler logging
async def handle(self, query: GetUserQuery):
logger.info("Processing GetUser") # ❌ Which request? Who? When?
return await self.repo.get(query.user_id)
With BaseQuery:
# Handler logging
async def handle(self, query: GetUserQuery):
logger.info(
f"[{query.request_id}] Processing GetUser "
f"for user_id={query.user_id} by {query.requested_by}"
)
result = await self.repo.get(query.user_id)
duration = (datetime.now(timezone.utc) - query.requested_at).total_seconds()
logger.info(f"[{query.request_id}] Retrieved user in {duration}s")
return result
Now in logs:
[550e8400-...] Processing GetUser for user_id=123 by john@example.com
[550e8400-...] Retrieved user in 0.342s
3. Distributed Tracing¶
Link related operations across your system:
# Initial command
save_cmd = SaveStateCommand(
token="abc123",
entity_id=1,
requested_by="john@example.com"
)
await handler.handle(save_cmd)
# request_id: 550e8400-e29b-41d4-a716-446655440000
# Related command - link them!
finalize_cmd = FinalizeSessionCommand(
token="abc123",
correlation_id=save_cmd.request_id, # ← Trace the relationship
requested_by="john@example.com"
)
await handler.handle(finalize_cmd)
# Logs show the connection:
# [550e8400] SaveState for token=abc123
# [abc12345 | correlation: 550e8400] FinalizeSession for token=abc123
4. Audit Trail for Compliance¶
Track who did what and when:
cmd = CreateInviteCommand(
email="user@example.com",
expires_in_days=7,
requested_by="admin@example.com" # ← Audit trail
)
# Later, in your database/logs:
# "Invite created by admin@example.com at 2026-02-26 10:30:45 [request: 550e8400]"
5. Built-in Immutability¶
Prevent accidental mutations:
query = GetUserQuery(user_id=123)
query.user_id = 456 # ❌ FrozenInstanceError
# Ensures queries/commands can't be tampered with after creation
6. Performance Monitoring¶
Track execution times automatically:
async def handle(self, query: GetUserQuery):
start = query.requested_at
result = await self.repo.get(query.user_id)
duration = (datetime.now(timezone.utc) - start).total_seconds()
# Send to monitoring system
metrics.record_query_duration(
query_type="GetUser",
duration=duration,
request_id=str(query.request_id)
)
return result
When to Use Plain BaseModel vs BaseQuery/BaseCommand¶
Use Case |
Use Plain BaseModel |
Use BaseQuery/BaseCommand |
|---|---|---|
API Request/Response DTOs |
✅ Yes |
❌ No (too many fields) |
Internal Data Transfer |
✅ Yes |
❌ No (overhead) |
CQRS Commands |
❌ No |
✅ YES (tracing needed) |
CQRS Queries |
❌ No |
✅ YES (tracing needed) |
Domain Events |
✅ Maybe |
✅ Maybe (if tracing needed) |
Configuration Objects |
✅ Yes |
❌ No (static data) |
Real-World Scenario¶
Imagine debugging a production issue:
❌ Without BaseQuery:
User reports: "My state save failed 10 minutes ago"
You search logs:
"Error: Token not found"
"Error: Token not found"
"Error: Token not found"
→ Which error is theirs? No way to know. No timestamp. No user ID.
✅ With BaseQuery:
User reports: "My state save failed 10 minutes ago"
You search logs for their email in requested_by:
"[550e8400] Error: Token not found | token=abc123 | user=john@example.com | at=2026-02-26 10:20:45"
→ Found it immediately! Can now trace the full request flow.
The Bottom Line¶
BaseQuery/BaseCommand = BaseModel + Production-Ready Observability
Same Pydantic validation
Same performance
+ Automatic tracing
+ Audit trail
+ Immutability
+ Debug-friendly
Cost: 4 optional fields (~100 bytes per instance)
Benefit: Complete production observability and audit trail
Use BaseQuery/BaseCommand for all CQRS operations. Use plain BaseModel for everything else.
Design Patterns¶
CQRS Pattern¶
Separates read and write operations:
Commands: Modify state (CREATE, UPDATE, DELETE)
Queries: Read state (GET, LIST)
This separation provides:
Independent scaling of reads and writes
Optimized data models for each operation type
Clear intent in your codebase
Handler Pattern¶
Each command/query has a dedicated handler:
Single responsibility: Each handler does one thing
Easy to test: Isolated business logic
Composable with middleware: Add logging, validation, authorization
Example:
# Command and its dedicated handler
class CreateUserCommand(BaseCommand):
name: str
email: str
class CreateUserHandler(ICommandHandler[CreateUserCommand, int]):
async def handle(self, command: CreateUserCommand) -> int:
# Single responsibility: create a user
return await self.user_repository.create(command)
# Query and its dedicated handler
class GetUserQuery(BaseQuery):
user_id: int
class GetUserHandler(IQueryHandler[GetUserQuery, User]):
async def handle(self, query: GetUserQuery) -> User:
# Single responsibility: fetch a user
return await self.user_repository.get(query.user_id)