python-cqrs-core

CQRS interfaces and base classes with built-in tracing and audit fields.

Installation

pip install python-cqrs-core

Public API

Class

Purpose

ICommand

Write operation interface

ICommandHandler

Command handler with handle()

IQuery

Read operation interface

IQueryHandler

Query handler with handle()

BaseCommand

Pydantic command with trace_id, user_id

BaseQuery

Pydantic query with tracing fields

PaginatedQuery

Query with page, page_size, offset

Guides

Usage Guide

Complete guide to using Python CQRS Core in your applications.

Commands

Commands represent write operations that modify state.

from python_cqrs_core import BaseCommand, ICommandHandler

class CreateUserCommand(BaseCommand):
    name: str
    email: str

class CreateUserHandler(ICommandHandler[CreateUserCommand, int]):
    async def handle(self, command: CreateUserCommand) -> int:
        # Create user logic
        user_id = await self.user_repository.create(
            name=command.name,
            email=command.email
        )
        return user_id

# Usage
cmd = CreateUserCommand(
    name="John Doe",
    email="john@example.com",
    requested_by="admin"
)
handler = CreateUserHandler()
user_id = await handler.handle(cmd)

Queries

Queries represent read operations that fetch state.

from python_cqrs_core import BaseQuery, IQueryHandler

class GetUserQuery(BaseQuery):
    user_id: int

class GetUserHandler(IQueryHandler[GetUserQuery, User]):
    async def handle(self, query: GetUserQuery) -> User:
        # Fetch user logic
        return await self.user_repository.get(query.user_id)

# Usage
query = GetUserQuery(user_id=1, requested_by="admin")
handler = GetUserHandler()
user = await handler.handle(query)

Paginated Queries

Queries with built-in pagination support.

from python_cqrs_core import PaginatedQuery

class ListUsersQuery(PaginatedQuery):
    status: str = "active"

query = ListUsersQuery(page=2, page_size=20, status="active")
offset = query.offset  # 20
limit = query.page_size  # 20

# Use in database query
users = await db.query(User).offset(query.offset).limit(query.page_size).all()

Tracing Fields

All commands and queries include built-in tracing fields for observability:

cmd = CreateUserCommand(name="John", email="john@example.com")

print(cmd.request_id)       # UUID
print(cmd.correlation_id)   # Optional[UUID]
print(cmd.requested_by)     # Optional[str]
print(cmd.requested_at)     # datetime

Field Details

request_id

  • Type: UUID

  • Auto-generated: Yes

  • Purpose: Unique identifier for this specific command/query

  • Usage: Track the request through logs and traces

correlation_id

  • Type: Optional[UUID]

  • Default: None

  • Purpose: Link related operations across services

  • Usage: Set to another command’s request_id to create a trace chain

# Initial command
save_cmd = SaveStateCommand(token="abc123", entity_id=1)
await handler.handle(save_cmd)

# Related command - link them
finalize_cmd = FinalizeSessionCommand(
    token="abc123",
    correlation_id=save_cmd.request_id  # Link to previous operation
)
await handler.handle(finalize_cmd)

requested_by

  • Type: Optional[str]

  • Default: None

  • Purpose: Identify who initiated the operation

  • Usage: Audit trail, security, logging

cmd = CreateUserCommand(
    name="Jane",
    email="jane@example.com",
    requested_by="admin@example.com"  # Audit trail
)

requested_at

  • Type: datetime

  • Auto-generated: Yes (UTC)

  • Purpose: Timestamp when command/query was created

  • Usage: Performance monitoring, audit trail

cmd = CreateUserCommand(name="John", email="john@example.com")
duration = (datetime.now(timezone.utc) - cmd.requested_at).total_seconds()
logger.info(f"Command processing took {duration}s")

Complete Example

Here’s a complete example showing commands, queries, and handlers working together:

from python_cqrs_core import BaseCommand, BaseQuery, ICommandHandler, IQueryHandler
from datetime import datetime, timezone
import logging

logger = logging.getLogger(__name__)

# Domain Model
class User:
    def __init__(self, id: int, name: str, email: str):
        self.id = id
        self.name = name
        self.email = email

# Command
class CreateUserCommand(BaseCommand):
    name: str
    email: str

class CreateUserHandler(ICommandHandler[CreateUserCommand, int]):
    def __init__(self, repository):
        self.repository = repository
    
    async def handle(self, command: CreateUserCommand) -> int:
        logger.info(
            f"[{command.request_id}] Creating user {command.email} "
            f"requested by {command.requested_by} at {command.requested_at}"
        )
        
        user_id = await self.repository.create(
            name=command.name,
            email=command.email
        )
        
        duration = (datetime.now(timezone.utc) - command.requested_at).total_seconds()
        logger.info(f"[{command.request_id}] User created in {duration}s")
        
        return user_id

# Query
class GetUserQuery(BaseQuery):
    user_id: int

class GetUserHandler(IQueryHandler[GetUserQuery, User]):
    def __init__(self, repository):
        self.repository = repository
    
    async def handle(self, query: GetUserQuery) -> User:
        logger.info(
            f"[{query.request_id}] Fetching user {query.user_id} "
            f"requested by {query.requested_by}"
        )
        
        user = await self.repository.get(query.user_id)
        
        return user

# Usage
async def main():
    # Create user
    create_cmd = CreateUserCommand(
        name="John Doe",
        email="john@example.com",
        requested_by="admin@example.com"
    )
    create_handler = CreateUserHandler(user_repository)
    user_id = await create_handler.handle(create_cmd)
    
    # Fetch user (linked via correlation_id)
    get_query = GetUserQuery(
        user_id=user_id,
        correlation_id=create_cmd.request_id,  # Link to create operation
        requested_by="admin@example.com"
    )
    get_handler = GetUserHandler(user_repository)
    user = await get_handler.handle(get_query)
    
    print(f"Created and fetched user: {user.name}")

Best Practices

Always Set requested_by

# ✅ Good
cmd = CreateUserCommand(
    name="John",
    email="john@example.com",
    requested_by="admin@example.com"
)

# ❌ Bad (no audit trail)
cmd = CreateUserCommand(
    name="John",
    email="john@example.com"
)

Log with request_id

async def handle(self, command: CreateUserCommand) -> int:
    logger.info(f"[{command.request_id}] Starting user creation")
    
    try:
        user_id = await self.repository.create(command)
        logger.info(f"[{command.request_id}] User created successfully")
        return user_id
    except Exception as e:
        logger.error(f"[{command.request_id}] Failed to create user: {e}")
        raise

Track Performance

async def handle(self, query: GetUserQuery) -> User:
    start = query.requested_at
    
    user = await self.repository.get(query.user_id)
    
    duration = (datetime.now(timezone.utc) - start).total_seconds()
    metrics.record_query_duration("GetUser", duration)
    
    if duration > 1.0:
        logger.warning(
            f"[{query.request_id}] Slow query: GetUser "
            f"took {duration}s for user_id={query.user_id}"
        )
    
    return user

Next Steps

API Reference

Interfaces

ICommand

Marker interface for commands (write operations).

Commands represent intentions to change state in the system.

Usage:

from python_cqrs_core import ICommand

class MyCommand(ICommand):
    pass

ICommandHandler[TCommand, TResult]

Handler interface for commands.

Type Parameters:

  • TCommand: The command type this handler processes

  • TResult: The return type of the handler

Methods:

async handle(command: TCommand) -> TResult

Process the command and return a result.

Parameters:

  • command: The command to process

Returns:

  • Result of the command execution

Example:

from python_cqrs_core import ICommandHandler, BaseCommand

class CreateUserCommand(BaseCommand):
    name: str
    email: str

class CreateUserHandler(ICommandHandler[CreateUserCommand, int]):
    async def handle(self, command: CreateUserCommand) -> int:
        # Create user logic
        user_id = await self.user_repository.create(
            name=command.name,
            email=command.email
        )
        return user_id

IQuery

Marker interface for queries (read operations).

Queries represent requests to read state from the system.

Usage:

from python_cqrs_core import IQuery

class MyQuery(IQuery):
    pass

IQueryHandler[TQuery, TResult]

Handler interface for queries.

Type Parameters:

  • TQuery: The query type this handler processes

  • TResult: The return type of the handler

Methods:

async handle(query: TQuery) -> TResult

Process the query and return a result.

Parameters:

  • query: The query to process

Returns:

  • Result of the query execution

Example:

from python_cqrs_core import IQueryHandler, BaseQuery

class GetUserQuery(BaseQuery):
    user_id: int

class GetUserHandler(IQueryHandler[GetUserQuery, User]):
    async def handle(self, query: GetUserQuery) -> User:
        return await self.user_repository.get(query.user_id)

Base Classes

BaseCommand

Base command with tracing and audit fields.

All commands should extend this class to gain automatic observability features.

Inheritance:

BaseCommand(BaseModel, ICommand)

Fields:

request_id: UUID

  • Type: UUID

  • Default: Auto-generated via uuid4()

  • Description: Unique request identifier for this command

  • Usage: Track this specific command execution through logs and traces

correlation_id: Optional[UUID]

  • Type: Optional[UUID]

  • Default: None

  • Description: Correlation ID for distributed tracing

  • Usage: Link related commands/queries across service boundaries

requested_by: Optional[str]

  • Type: Optional[str]

  • Default: None

  • Description: User or system that initiated the command

  • Usage: Audit trail - track who performed the action

requested_at: datetime

  • Type: datetime

  • Default: Auto-generated via datetime.now(timezone.utc)

  • Description: Timestamp when command was created

  • Usage: Audit trail and performance monitoring

Configuration:

  • frozen=True: Commands are immutable after creation

Example:

from python_cqrs_core import BaseCommand

class CreateUserCommand(BaseCommand):
    name: str
    email: str

# Usage
cmd = CreateUserCommand(
    name="John Doe",
    email="john@example.com",
    requested_by="admin@example.com"
)

print(cmd.request_id)      # UUID('550e8400-e29b-41d4-a716-446655440000')
print(cmd.requested_by)    # "admin@example.com"
print(cmd.requested_at)    # datetime(2026, 2, 26, 10, 30, 45, tzinfo=timezone.utc)

BaseQuery

Base query with tracing fields.

All queries should extend this class to gain automatic observability features.

Inheritance:

BaseQuery(BaseModel, IQuery)

Fields:

Same as BaseCommand:

  • request_id: UUID

  • correlation_id: Optional[UUID]

  • requested_by: Optional[str]

  • requested_at: datetime

Configuration:

  • frozen=True: Queries are immutable after creation

Example:

from python_cqrs_core import BaseQuery

class GetUserQuery(BaseQuery):
    user_id: int

# Usage
query = GetUserQuery(
    user_id=123,
    requested_by="john@example.com"
)

print(query.request_id)    # UUID('abc12345-...')
print(query.user_id)       # 123

PaginatedQuery

Query with pagination support.

Extends BaseQuery with pagination fields and automatic offset calculation.

Inheritance:

PaginatedQuery(BaseQuery)

Fields:

All BaseQuery fields, plus:

page: int

  • Type: int

  • Default: 1

  • Constraints: >= 1 (1-indexed)

  • Description: Page number

  • Usage: Specify which page of results to retrieve

page_size: int

  • Type: int

  • Default: 10

  • Constraints: 1 <= page_size <= 100

  • Description: Items per page

  • Usage: Control result set size (max 100 items)

Properties:

offset: int

Calculated offset for database queries.

Formula: (page - 1) * page_size

Returns:

  • int: Offset value to use in database LIMIT/OFFSET queries

Example:

from python_cqrs_core import PaginatedQuery

class ListUsersQuery(PaginatedQuery):
    status: str = "active"

# Page 1
query = ListUsersQuery(page=1, page_size=20)
print(query.offset)  # 0

# Page 3
query = ListUsersQuery(page=3, page_size=20)
print(query.offset)  # 40

# Use in database query
users = await db.query(User)\
    .offset(query.offset)\
    .limit(query.page_size)\
    .all()

Type Hints

All interfaces and base classes support full generic type hints:

from python_cqrs_core import IQueryHandler, BaseQuery
from typing import List

class ListUsersQuery(BaseQuery):
    status: str

class User:
    id: int
    name: str

# Handler with proper type hints
class ListUsersHandler(IQueryHandler[ListUsersQuery, List[User]]):
    async def handle(self, query: ListUsersQuery) -> List[User]:
        # Type checker knows:
        # - query is ListUsersQuery
        # - Must return List[User]
        return await self.repo.list(status=query.status)

Immutability

All queries and commands are immutable (frozen) after creation:

query = GetUserQuery(user_id=123)
query.user_id = 456  # ❌ Raises: FrozenInstanceError

# Must create new instance
new_query = GetUserQuery(user_id=456)  # ✅ OK

This ensures that queries/commands cannot be modified after creation, providing:

  • Thread safety

  • Predictable behavior

  • Prevention of accidental mutations in handlers

Observability Integration

Overview

BaseQuery and BaseCommand are designed to work seamlessly with modern observability tools like OpenTelemetry, Sentry, and Prometheus. They provide the crucial business context layer that complements infrastructure tracing.

Key Insight: Complementary, Not Redundant

Infrastructure Tracing vs Domain Tracing

Aspect

OpenTelemetry/Sentry/Prometheus

BaseQuery/BaseCommand

Purpose

Technical/infrastructure tracing

Business/domain context

What it tracks

HTTP calls, DB queries, spans, metrics

Who, what, when at domain level

IDs

Opaque trace IDs (abc123def456...)

Business IDs (request_id, order_id)

Searchability

By trace ID, span name

By user, email, business entity

Compliance

Must add explicit audit events

Built-in audit trail

Best for

“What’s slow?”, “Where did it fail?”

“Who did this?”, “What business operation?”

Bottom line: Infrastructure tools show HOW your system works. Your BaseQuery/BaseCommand fields provide WHO and WHAT at the business level.

Integration Pattern: The Bridge

Your BaseQuery/BaseCommand fields become the attributes you attach to OpenTelemetry spans and Sentry events.

Domain Layer (Your Code)
  ↓ BaseQuery/BaseCommand with business fields
  ↓
Handler Layer  
  ↓ Attach business fields to OTel span attributes
  ↓
Infrastructure Layer (OpenTelemetry)
  ↓ Technical tracing (HTTP, DB, cache, etc.)
  ↓
Observability Tools (Sentry/Prometheus/Jaeger)
  ↓ Unified view: Business context + Technical trace

OpenTelemetry Integration

Setup

pip install opentelemetry-api opentelemetry-sdk

Pattern 1: Attach Command/Query Context to Spans

Create a helper to bridge your domain fields to OpenTelemetry:

from opentelemetry import trace
from python_cqrs_core import BaseCommand, BaseQuery
from typing import Union

def attach_cqrs_context(
    span: trace.Span, 
    cqrs_object: Union[BaseCommand, BaseQuery]
):
    """Attach BaseCommand/BaseQuery fields to OpenTelemetry span."""
    span.set_attribute("business.request_id", str(cqrs_object.request_id))
    
    if cqrs_object.correlation_id:
        span.set_attribute("business.correlation_id", str(cqrs_object.correlation_id))
    
    if cqrs_object.requested_by:
        span.set_attribute("requested_by", cqrs_object.requested_by)
    
    span.set_attribute("requested_at", cqrs_object.requested_at.isoformat())
    
    # Determine if it's a command or query
    span.set_attribute(
        "cqrs.type", 
        "command" if isinstance(cqrs_object, BaseCommand) else "query"
    )
    span.set_attribute("cqrs.name", type(cqrs_object).__name__)

Pattern 2: Use in Your Handlers

from opentelemetry import trace

class CreateInviteCommandHandler:
    def __init__(self, repository):
        self.repository = repository
        self.tracer = trace.get_tracer("cqrs.write", "1.0.0")
    
    async def handle(self, command: CreateInviteCommand):
        with self.tracer.start_as_current_span("CreateInvite") as span:
            # Attach your domain context
            attach_cqrs_context(span, command)
            
            # Add business-specific attributes
            span.set_attribute("invite.email", command.email)
            span.set_attribute("invite.expires_in_days", command.expires_in_days)
            
            # Execute business logic
            result = await self.repository.create(command)
            
            # Add result attributes
            span.set_attribute("result.invite_id", result.id)
            span.set_attribute("result.token", result.token)
            
            return result

Pattern 3: Propagate Correlation IDs via Baggage

Use OpenTelemetry baggage to propagate your correlation IDs downstream:

from opentelemetry import baggage, context

async def handle(self, command: CreateInviteCommand):
    # Set correlation_id in baggage for downstream services
    correlation_id = str(command.correlation_id or command.request_id)
    
    ctx = baggage.set_baggage("correlation.id", correlation_id)
    
    with self.tracer.start_as_current_span("CreateInvite", context=ctx) as span:
        attach_cqrs_context(span, command)
        # All downstream services get correlation.id automatically!
        result = await self.repository.create(command)
        return result

Pattern 4: Separate Command and Query Tracers (Best Practice 2026)

Create separate tracers for read and write paths:

# In your application setup
write_tracer = trace.get_tracer("cqrs.write", "1.0.0")
read_tracer = trace.get_tracer("cqrs.read", "1.0.0")

# Command handlers use write tracer
class CreateInviteHandler:
    tracer = write_tracer

# Query handlers use read tracer  
class GetInviteHandler:
    tracer = read_tracer

This enables independent monitoring of write-side throughput vs read-side latency.

Sentry Integration

Setup

pip install sentry-sdk opentelemetry-sdk

Pattern 1: Automatic Trace Context Correlation

Initialize OpenTelemetry first, then Sentry with OpenTelemetry integration:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
import sentry_sdk
from sentry_sdk.integrations.opentelemetry import OpenTelemetryIntegration

# Initialize OpenTelemetry
trace.set_tracer_provider(TracerProvider())

# Initialize Sentry with OTel integration
sentry_sdk.init(
    dsn="your-dsn",
    integrations=[OpenTelemetryIntegration()],
    traces_sample_rate=1.0,
)

Now Sentry automatically picks up OpenTelemetry trace context, and your BaseCommand/BaseQuery fields flow through!

Pattern 2: Attach Context to Sentry Scope

import sentry_sdk

async def handle(self, command: CreateInviteCommand):
    with sentry_sdk.configure_scope() as scope:
        # Add your business context
        scope.set_tag("business.request_id", str(command.request_id))
        scope.set_tag("requested_by", command.requested_by)
        scope.set_context("command", {
            "type": type(command).__name__,
            "request_id": str(command.request_id),
            "correlation_id": str(command.correlation_id) if command.correlation_id else None,
            "requested_by": command.requested_by,
            "requested_at": command.requested_at.isoformat(),
        })
        
        # Execute command
        result = await self.repository.create(command)
        return result

Pattern 3: Search Sentry by Business Context

When errors occur, search Sentry by your domain fields:

# User reports: "My invite creation failed"
# User email: john@example.com

Search Sentry:
  requested_by:"john@example.com" AND type:"CreateInviteCommand"

Result:
  Error: DatabaseError
  ↳ business.request_id: 550e8400-e29b-41d4-a716-446655440000
  ↳ requested_by: john@example.com
  ↳ command.type: CreateInviteCommand
  ↳ OTel Trace ID: abc123def456...
  
Click trace ID → See full distributed trace!

Prometheus Integration

Pattern: Use request_id for Request Duration Metrics

from prometheus_client import Histogram
from datetime import datetime, timezone

# Define metric
command_duration = Histogram(
    'cqrs_command_duration_seconds',
    'Command execution duration',
    ['command_type', 'requested_by', 'status']
)

async def handle(self, command: CreateInviteCommand):
    start = command.requested_at
    status = "success"
    
    try:
        result = await self.repository.create(command)
        return result
    except Exception as e:
        status = "error"
        raise
    finally:
        duration = (datetime.now(timezone.utc) - start).total_seconds()
        
        # Record metric with business context
        command_duration.labels(
            command_type=type(command).__name__,
            requested_by=command.requested_by or "anonymous",
            status=status
        ).observe(duration)

Pattern: Counter by User

from prometheus_client import Counter

command_counter = Counter(
    'cqrs_commands_total',
    'Total commands processed',
    ['command_type', 'requested_by']
)

async def handle(self, command: CreateInviteCommand):
    # Increment counter with business context
    command_counter.labels(
        command_type=type(command).__name__,
        requested_by=command.requested_by or "anonymous"
    ).inc()
    
    result = await self.repository.create(command)
    return result

Now in Prometheus/Grafana you can query:

# Commands per user
sum by (requested_by) (cqrs_commands_total)

# P95 latency by command type
histogram_quantile(0.95, cqrs_command_duration_seconds)

Audit Trail for Compliance (SOC 2, HIPAA, PCI DSS)

Using OpenTelemetry Span Events (2026 Best Practice)

async def handle(self, command: CreateInviteCommand):
    with tracer.start_as_current_span("CreateInvite") as span:
        attach_cqrs_context(span, command)
        
        # Execute command
        result = await self.repository.create(command)
        
        # Record audit event
        span.add_event(
            "invite.created",
            attributes={
                # Compliance fields
                "audit.action": "CREATE_INVITE",
                "audit.actor": command.requested_by or "system",
                "audit.resource_type": "invite",
                "audit.resource_id": str(result.id),
                "audit.timestamp": command.requested_at.isoformat(),
                
                # Business context
                "audit.business_request_id": str(command.request_id),
                "audit.correlation_id": str(command.correlation_id) if command.correlation_id else None,
                
                # Data classification
                "compliance.data_classification": "PII",
                "compliance.contains_email": True,
                
                # Result
                "audit.result": "success",
            }
        )
        
        return result

This creates audit trail entries that:

  • Automatically correlate with the originating request

  • Are stored in your tracing backend

  • Support compliance requirements

  • Can be queried for reports

Real-World Example: Full Integration

Production Incident Investigation

Step 1: User Reports Issue

"My order ORDER-12345 failed 10 minutes ago" - john@example.com

Step 2: Search Application Logs (BaseCommand Fields)

grep "requested_by=john@example.com" logs.txt | grep "ORDER-12345"

[550e8400-...] CreateOrderCommand | order_id=ORDER-12345 | 
  requested_by=john@example.com | at=2026-02-26 10:20:45

Step 3: Use request_id to Find OTel Trace in Sentry

# Search Sentry for: business.request_id=550e8400-...

Step 4: See Full Distributed Trace

Trace: abc123def456...
├─ HTTP POST /orders [200ms]
├─ CreateOrderCommand [business.request_id: 550e8400] [150ms]
│  ├─ ValidatePayment [50ms] ✅
│  ├─ CheckInventory [30ms] ✅
│  └─ SaveOrder [70ms] ❌ DATABASE_ERROR
└─ HTTP Response [500] ❌

Error: Connection timeout to postgres
Sentry Context:
  - requested_by: john@example.com
  - business.request_id: 550e8400-...
  - order.id: ORDER-12345

Result:

  • Found the exact user’s request via your domain fields

  • Saw the full technical trace via OpenTelemetry

  • Diagnosed root cause: Database connection issue

Without your BaseCommand fields: You’d search through 1000s of traces with no business context.

Complete Handler Example

Here’s a production-ready handler with full observability:

from opentelemetry import trace, baggage
from prometheus_client import Histogram, Counter
import sentry_sdk
from python_cqrs_core import BaseCommand, ICommandHandler

class CreateInviteCommand(BaseCommand):
    email: str
    expires_in_days: int

class CreateInviteHandler(ICommandHandler[CreateInviteCommand, InviteResult]):
    def __init__(self, repository, metrics_registry):
        self.repository = repository
        self.tracer = trace.get_tracer("cqrs.write", "1.0.0")
        
        # Prometheus metrics
        self.duration_histogram = Histogram(
            'create_invite_duration_seconds',
            'CreateInvite command duration',
            ['requested_by', 'status']
        )
        self.counter = Counter(
            'create_invite_total',
            'Total CreateInvite commands',
            ['requested_by']
        )
    
    async def handle(self, command: CreateInviteCommand) -> InviteResult:
        # Set up correlation
        correlation_id = str(command.correlation_id or command.request_id)
        ctx = baggage.set_baggage("correlation.id", correlation_id)
        
        status = "success"
        
        with self.tracer.start_as_current_span(
            "CreateInvite", 
            context=ctx
        ) as span:
            try:
                # Attach business context to OTel span
                span.set_attribute("business.request_id", str(command.request_id))
                span.set_attribute("requested_by", command.requested_by or "anonymous")
                span.set_attribute("cqrs.type", "command")
                span.set_attribute("cqrs.command.type", "CreateInvite")
                span.set_attribute("invite.email", command.email)
                
                # Attach context to Sentry
                with sentry_sdk.configure_scope() as scope:
                    scope.set_tag("business.request_id", str(command.request_id))
                    scope.set_tag("requested_by", command.requested_by)
                    scope.set_context("command", {
                        "type": "CreateInviteCommand",
                        "request_id": str(command.request_id),
                        "email": command.email,
                    })
                
                # Increment Prometheus counter
                self.counter.labels(
                    requested_by=command.requested_by or "anonymous"
                ).inc()
                
                # Execute business logic
                result = await self.repository.create(command)
                
                # Record audit event
                span.add_event(
                    "invite.created",
                    attributes={
                        "audit.action": "CREATE_INVITE",
                        "audit.actor": command.requested_by or "system",
                        "audit.resource_id": str(result.id),
                        "audit.timestamp": command.requested_at.isoformat(),
                    }
                )
                
                return result
                
            except Exception as e:
                status = "error"
                span.set_status(trace.Status(trace.StatusCode.ERROR))
                span.record_exception(e)
                raise
            
            finally:
                # Record duration in Prometheus
                duration = (datetime.now(timezone.utc) - command.requested_at).total_seconds()
                self.duration_histogram.labels(
                    requested_by=command.requested_by or "anonymous",
                    status=status
                ).observe(duration)

Best Practices (2026)

1. Use Separate Tracers for Commands and Queries

# Application setup
write_tracer = trace.get_tracer("cqrs.write", "1.0.0")
read_tracer = trace.get_tracer("cqrs.read", "1.0.0")

# Command handlers
class MyCommandHandler:
    tracer = write_tracer

# Query handlers
class MyQueryHandler:
    tracer = read_tracer

This enables independent performance analysis of read vs write paths.

2. Propagate Business Correlation IDs

Always set correlation IDs in baggage:

correlation_id = str(command.correlation_id or command.request_id)
ctx = baggage.set_baggage("correlation.id", correlation_id)

with tracer.start_as_current_span("MyOperation", context=ctx):
    # Downstream services automatically get correlation.id
    await self.external_service.call()

3. Structure Audit Events Consistently

Use a consistent schema for all audit events:

AUDIT_EVENT_SCHEMA = {
    "audit.action": "string",           # CREATE_USER, DELETE_ORDER, etc.
    "audit.actor": "string",            # Who (from requested_by)
    "audit.resource_type": "string",    # What type (user, order, invite)
    "audit.resource_id": "string",      # Which one (user_id, order_id)
    "audit.timestamp": "ISO8601",       # When (from requested_at)
    "audit.business_request_id": "UUID", # Unique identifier
    "audit.result": "string",           # success, denied, error
}

4. Use request_id for Log Correlation

Structure your logs to include request_id:

import logging
import structlog

logger = structlog.get_logger()

async def handle(self, command: CreateInviteCommand):
    # Bind request_id to all logs in this context
    log = logger.bind(
        request_id=str(command.request_id),
        requested_by=command.requested_by,
        command_type=type(command).__name__
    )
    
    log.info("processing_command")
    result = await self.repository.create(command)
    log.info("command_completed", invite_id=result.id)
    
    return result

Query Examples

Find All Operations by User

Application logs:

grep 'requested_by=john@example.com' app.log

Sentry:

Search: requested_by:"john@example.com"

Jaeger (OpenTelemetry):

Tags: requested_by="john@example.com"

Trace Request Flow

Using correlation_id:

# Find initial request
grep 'request_id=550e8400' app.log

# Find all related operations
grep 'correlation_id=550e8400' app.log

In Jaeger:

Search by baggage: correlation.id="550e8400..."
→ Shows all connected operations across services

Summary: Why Keep BaseQuery/BaseCommand with OTel/Sentry?

They Solve Different Problems

Problem

Solution

“Which request failed?”

request_id (yours)

“Where in the code did it fail?”

OpenTelemetry spans

“Who made this request?”

requested_by (yours)

“What was the error?”

Sentry error tracking

“How long did it take?”

Both (requested_at + OTel timing)

“What’s the related operation?”

correlation_id (yours) + OTel baggage

Your Fields Are the Bridge

User Reports Issue
  ↓
Search by business context (requested_by, order_id)
  ↓ [YOUR FIELDS]
Find request_id in logs
  ↓
Search Sentry/Jaeger by business.request_id
  ↓ [INFRASTRUCTURE TOOLS]
View full distributed trace
  ↓
Diagnose technical root cause

Without your BaseCommand fields: Infrastructure tools have no business context.
Without infrastructure tools: Your fields have no technical details.

Together: Complete observability from business to infrastructure! 🎯

Why Use BaseQuery/BaseCommand?

Overview

While BaseQuery and BaseCommand extend Pydantic’s BaseModel, they provide critical production-ready features that go beyond basic validation.

Benefits

1. Zero-Effort Observability

Every query/command automatically gets tracing fields without manual declaration:

# Plain Pydantic (NO observability)
class GetUserQuery(BaseModel):
    user_id: int
    # No way to track this request through logs

# With BaseQuery (FULL observability)
class GetUserQuery(BaseQuery):
    user_id: int
    # Automatically includes: request_id, correlation_id, requested_by, requested_at

2. Production Debugging

Without BaseQuery:

# Handler logging
async def handle(self, query: GetUserQuery):
    logger.info("Processing GetUser")  # ❌ Which request? Who? When?
    return await self.repo.get(query.user_id)

With BaseQuery:

# Handler logging
async def handle(self, query: GetUserQuery):
    logger.info(
        f"[{query.request_id}] Processing GetUser "
        f"for user_id={query.user_id} by {query.requested_by}"
    )
    result = await self.repo.get(query.user_id)
    
    duration = (datetime.now(timezone.utc) - query.requested_at).total_seconds()
    logger.info(f"[{query.request_id}] Retrieved user in {duration}s")
    return result

Now in logs:

[550e8400-...] Processing GetUser for user_id=123 by john@example.com
[550e8400-...] Retrieved user in 0.342s

3. Distributed Tracing

Link related operations across your system:

# Initial command
save_cmd = SaveStateCommand(
    token="abc123",
    entity_id=1,
    requested_by="john@example.com"
)
await handler.handle(save_cmd)
# request_id: 550e8400-e29b-41d4-a716-446655440000

# Related command - link them!
finalize_cmd = FinalizeSessionCommand(
    token="abc123",
    correlation_id=save_cmd.request_id,  # ← Trace the relationship
    requested_by="john@example.com"
)
await handler.handle(finalize_cmd)

# Logs show the connection:
# [550e8400] SaveState for token=abc123
# [abc12345 | correlation: 550e8400] FinalizeSession for token=abc123

4. Audit Trail for Compliance

Track who did what and when:

cmd = CreateInviteCommand(
    email="user@example.com",
    expires_in_days=7,
    requested_by="admin@example.com"  # ← Audit trail
)

# Later, in your database/logs:
# "Invite created by admin@example.com at 2026-02-26 10:30:45 [request: 550e8400]"

5. Built-in Immutability

Prevent accidental mutations:

query = GetUserQuery(user_id=123)
query.user_id = 456  # ❌ FrozenInstanceError
# Ensures queries/commands can't be tampered with after creation

6. Performance Monitoring

Track execution times automatically:

async def handle(self, query: GetUserQuery):
    start = query.requested_at
    result = await self.repo.get(query.user_id)
    duration = (datetime.now(timezone.utc) - start).total_seconds()
    
    # Send to monitoring system
    metrics.record_query_duration(
        query_type="GetUser",
        duration=duration,
        request_id=str(query.request_id)
    )
    return result

When to Use Plain BaseModel vs BaseQuery/BaseCommand

Use Case

Use Plain BaseModel

Use BaseQuery/BaseCommand

API Request/Response DTOs

✅ Yes

❌ No (too many fields)

Internal Data Transfer

✅ Yes

❌ No (overhead)

CQRS Commands

❌ No

YES (tracing needed)

CQRS Queries

❌ No

YES (tracing needed)

Domain Events

✅ Maybe

✅ Maybe (if tracing needed)

Configuration Objects

✅ Yes

❌ No (static data)

Real-World Scenario

Imagine debugging a production issue:

❌ Without BaseQuery:
User reports: "My state save failed 10 minutes ago"
You search logs: 
  "Error: Token not found"  
  "Error: Token not found"  
  "Error: Token not found"
→ Which error is theirs? No way to know. No timestamp. No user ID.

✅ With BaseQuery:
User reports: "My state save failed 10 minutes ago" 
You search logs for their email in requested_by:
  "[550e8400] Error: Token not found | token=abc123 | user=john@example.com | at=2026-02-26 10:20:45"
→ Found it immediately! Can now trace the full request flow.

The Bottom Line

BaseQuery/BaseCommand = BaseModel + Production-Ready Observability

  • Same Pydantic validation

  • Same performance

  • + Automatic tracing

  • + Audit trail

  • + Immutability

  • + Debug-friendly

Cost: 4 optional fields (~100 bytes per instance)
Benefit: Complete production observability and audit trail

Use BaseQuery/BaseCommand for all CQRS operations. Use plain BaseModel for everything else.

Design Patterns

CQRS Pattern

Separates read and write operations:

  • Commands: Modify state (CREATE, UPDATE, DELETE)

  • Queries: Read state (GET, LIST)

This separation provides:

  • Independent scaling of reads and writes

  • Optimized data models for each operation type

  • Clear intent in your codebase

Handler Pattern

Each command/query has a dedicated handler:

  • Single responsibility: Each handler does one thing

  • Easy to test: Isolated business logic

  • Composable with middleware: Add logging, validation, authorization

Example:

# Command and its dedicated handler
class CreateUserCommand(BaseCommand):
    name: str
    email: str

class CreateUserHandler(ICommandHandler[CreateUserCommand, int]):
    async def handle(self, command: CreateUserCommand) -> int:
        # Single responsibility: create a user
        return await self.user_repository.create(command)

# Query and its dedicated handler
class GetUserQuery(BaseQuery):
    user_id: int

class GetUserHandler(IQueryHandler[GetUserQuery, User]):
    async def handle(self, query: GetUserQuery) -> User:
        # Single responsibility: fetch a user
        return await self.user_repository.get(query.user_id)