python-outbox-core¶
Core abstractions for the transactional outbox pattern (Kafka, FastStream, RabbitMQ).
Installation¶
pip install python-outbox-core
Public API¶
Class |
Purpose |
|---|---|
|
Abstract event base (Pydantic) |
|
Persistence interface |
|
Broker publishing interface |
|
Reusable worker logic |
|
Worker configuration |
|
Per-event error handling |
|
Structured logging metrics |
|
CloudEvents 1.0 formatting |
|
Health check interface |
Quick reference¶
Outbox Core - Quick Reference¶
NOTE: No custom serializer! Use Pydantic’s model_dump_json() & model_validate_json(). FastStream auto-serializes dicts.
💡 RECOMMENDED: FastStream (ag2ai) - cutting-edge for event-driven systems, ideal for startups. Custom serializers welcome.
API¶
# Event
class IOutboxEvent(BaseModel, ABC):
event_id: UUID; event_type: str; aggregate_id: str
occurred_at: datetime; source: str; data_version: str = "1.0"
@abstractmethod
def to_message(self) -> dict: ...
# Repository
class IOutboxRepository(ABC):
async def add_event(event) -> None
async def get_unpublished(limit=100) -> List[IOutboxEvent]
async def mark_published(event_id: UUID) -> None
async def mark_failed(event_id: UUID, error: str) -> None
# Publisher
class IEventPublisher(ABC):
async def publish(message: dict) -> None # FastStream handles JSON!
# Worker
class OutboxPublisherBase(ABC):
async def publish_batch(limit=100) -> int
@abstractmethod
async def schedule_publishing() -> None
Usage¶
# 1. Define
class UserCreated(IOutboxEvent):
user_id: UUID; email: str
event_type: str = "user.created"; source: str = "api"
def to_message(self): return {"id": str(self.event_id), "data": {...}}
# 2. Store (atomic with domain entity)
await outbox_repo.add_event(UserCreated(...))
await db_session.commit()
# 3. Worker
class Worker(OutboxPublisherBase):
async def schedule_publishing(self):
while True: await self.publish_batch(); await asyncio.sleep(5)
# 4. Repository (SQLAlchemy)
class Repo(IOutboxRepository):
async def add_event(self, e):
self.session.add(OutboxORM(
id=e.event_id,
payload=e.model_dump_json() # Pydantic built-in!
))
async def get_unpublished(self, limit=100):
result = await self.session.execute(
select(OutboxORM).where(OutboxORM.published==False).limit(limit)
)
return [EventClass.model_validate_json(orm.payload) for orm in result.scalars()]
# 5. Publisher (Kafka via FastStream)
class KafkaPub(IEventPublisher):
async def publish(self, msg):
# FastStream auto-serializes dict → JSON!
await self.broker.publish(msg, topic=msg["type"])
Config (Env)¶
OUTBOX_BATCH_SIZE=100
OUTBOX_POLL_INTERVAL_SECONDS=5
Import¶
from outbox_sdk.core import (
IOutboxEvent, IOutboxRepository, IEventPublisher,
OutboxPublisherBase, OutboxConfig, OutboxErrorHandler, OutboxMetrics
)
Serialization Notes¶
No custom serializer needed!
DB Storage: Use
event.model_dump_json()(Pydantic v2)DB Loading: Use
EventClass.model_validate_json(json_str)(Pydantic v2)Kafka Publishing: FastStream auto-serializes
dict→ JSON
Pattern¶
Handler → DB(entity + outbox) → Worker → Kafka
└──── ATOMIC ────┘
✅ Atomic | ✅ At-least-once | ✅ No loss | ✅ DLQ
Detailed guides¶
Outbox Core - API Quick Reference¶
Version: 0.1.0 | Status: ✅ Production Ready
📦 Installation¶
pip install outbox-sdk
# or
poetry add outbox-sdk
🔌 Core API¶
Events¶
from outbox_sdk.core import IOutboxEvent
class IOutboxEvent(BaseModel, ABC):
# Required
event_id: UUID
event_type: str # "com.company.domain.action"
aggregate_id: str
occurred_at: datetime
source: str # Service name
# Versioning
data_version: str = "1.0"
# Tracing (optional)
correlation_id: UUID | None
causation_id: UUID | None
@abstractmethod
def to_message(self) -> dict[str, Any]:
"""Convert to broker format."""
Repository¶
from outbox_sdk.core import IOutboxRepository
class IOutboxRepository(ABC):
async def add_event(event: IOutboxEvent) -> None
async def get_unpublished(limit: int = 100, offset: int = 0) -> List[IOutboxEvent]
async def mark_published(event_id: UUID) -> None
async def mark_failed(event_id: UUID, error_message: str) -> None
async def count_unpublished() -> int
Publisher¶
from outbox_sdk.core import IEventPublisher
class IEventPublisher(ABC):
async def publish(message: dict[str, Any]) -> None
Worker¶
from outbox_sdk.core import OutboxPublisherBase
class OutboxPublisherBase(ABC):
def __init__(
repository: IOutboxRepository,
publisher: IEventPublisher,
error_handler: OutboxErrorHandler | None = None,
metrics: OutboxMetrics | None = None,
)
async def publish_batch(limit: int = 100) -> int
@abstractmethod
async def schedule_publishing() -> None
Error Handling¶
from outbox_sdk.core import OutboxErrorHandler
class OutboxErrorHandler:
def __init__(logger: Any = None, max_retries: int = 3)
def handle(event: Any, exception: Exception) -> None
def should_retry(event: Any, exception: Exception, attempt: int) -> bool
def is_transient_error(exception: Exception) -> bool
Metrics¶
from outbox_sdk.core import OutboxMetrics
class OutboxMetrics:
def __init__(logger: Any = None)
def log_no_events() -> None
def log_success(event: Any) -> None
def log_batch_complete(published: int, total: int) -> None
def log_batch_started(batch_size: int) -> None
Configuration¶
from outbox_sdk.core import OutboxConfig
class OutboxConfig(BaseModel):
batch_size: int = 100 # 1-1000
poll_interval_seconds: int = 5 # 1-3600
max_retry_count: int = 3 # 0-100
retry_backoff_multiplier: float = 2.0 # 1.0-10.0
enable_metrics: bool = True
enable_health_check: bool = True
# Env vars: OUTBOX_BATCH_SIZE, OUTBOX_POLL_INTERVAL_SECONDS, etc.
Health Checks¶
from outbox_sdk.core import OutboxHealthCheck, HealthStatus
class HealthStatus(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
class OutboxHealthCheck(ABC):
async def check_health() -> Dict[str, Any]
async def check_database() -> Dict[str, Any]
async def check_broker() -> Dict[str, Any]
async def check_outbox_lag() -> Dict[str, Any]
Serialization¶
NOTE: No custom serializer!
💡 RECOMMENDED: FastStream (ag2ai) - cutting-edge event-driven framework, especially for startups. Custom serializers supported if needed.
In most cases just use Pydantic v2 built-in methods:
# Serialize to JSON string (for PostgreSQL JSONB)
json_str = event.model_dump_json()
# Deserialize from JSON string
event = UserCreatedEvent.model_validate_json(json_str)
# For Kafka: FastStream auto-serializes dicts!
await broker.publish(event.to_message()) # No manual JSON needed
🚀 Quick Start¶
1. Define Event¶
from outbox_sdk.core import IOutboxEvent
from uuid import UUID, uuid4
from datetime import datetime
from pydantic import Field
class UserCreatedEvent(IOutboxEvent):
user_id: UUID
email: str
event_id: UUID = Field(default_factory=uuid4)
event_type: str = "com.app.user.created"
aggregate_id: str = Field(alias="user_id")
occurred_at: datetime = Field(default_factory=datetime.utcnow)
source: str = "user-service"
def to_message(self) -> dict:
return {
"id": str(self.event_id),
"type": self.event_type,
"data": {"user_id": str(self.user_id), "email": self.email}
}
2. Store in Transaction¶
async def create_user(cmd, user_repo, outbox_repo, db_session):
user = await user_repo.create(cmd.to_entity())
event = UserCreatedEvent(user_id=user.id, email=user.email)
await outbox_repo.add_event(event)
await db_session.commit() # Atomic!
3. Implement Worker¶
from outbox_sdk.core import OutboxPublisherBase
import asyncio
class MyWorker(OutboxPublisherBase):
async def schedule_publishing(self):
while True:
await self.publish_batch(limit=100)
await asyncio.sleep(5)
4. Implement Repository¶
from outbox_sdk.core import IOutboxRepository
from sqlalchemy.ext.asyncio import AsyncSession
class SQLAlchemyOutboxRepo(IOutboxRepository):
def __init__(self, session: AsyncSession):
self.session = session
async def add_event(self, event):
orm_event = OutboxEventORM(
id=event.event_id,
event_type=event.event_type,
payload=event.model_dump_json(), # Pydantic built-in!
# ...
)
self.session.add(orm_event)
# No commit - caller handles!
async def get_unpublished(self, limit=100, offset=0):
result = await self.session.execute(
select(OutboxEventORM)
.where(OutboxEventORM.published == False)
.order_by(OutboxEventORM.occurred_at.asc())
.limit(limit)
)
# Deserialize using Pydantic built-in
events = []
for orm in result.scalars().all():
event_class = self.event_registry[orm.event_type]
events.append(event_class.model_validate_json(orm.payload))
return events
async def mark_published(self, event_id):
await self.session.execute(
update(OutboxEventORM)
.where(OutboxEventORM.id == event_id)
.values(published=True, published_at=datetime.utcnow())
)
async def mark_failed(self, event_id, error_message):
await self.session.execute(
update(OutboxEventORM)
.where(OutboxEventORM.id == event_id)
.values(failed=True, error_message=error_message)
)
async def count_unpublished(self):
result = await self.session.execute(
select(func.count(OutboxEventORM.id))
.where(OutboxEventORM.published == False)
)
return result.scalar()
5. Implement Publisher¶
from outbox_sdk.core import IEventPublisher
from faststream.kafka import KafkaBroker
class KafkaPublisher(IEventPublisher):
def __init__(self, broker: KafkaBroker):
self.broker = broker
async def publish(self, message: dict):
# NOTE: FastStream auto-serializes dict → JSON!
await self.broker.publish(
message,
topic=message.get("type"),
key=message.get("aggregate_id")
)
🎯 Full Import Example¶
from outbox_sdk.core import (
# Contracts
IOutboxEvent,
IOutboxRepository,
IEventPublisher,
# Worker
OutboxPublisherBase,
OutboxErrorHandler,
OutboxMetrics,
# Utils
OutboxConfig,
# Health
OutboxHealthCheck,
HealthStatus,
)
# NOTE: No OutboxSerializer - use Pydantic's model_dump_json() instead!
📊 Architecture¶
┌─────────────────────────────────────────────────────────┐
│ Command Handler │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. Create domain entity │ │
│ │ 2. Save entity (user_repo.save) │ │
│ │ 3. Create event (UserCreatedEvent) │ │
│ │ 4. Store event (outbox_repo.add_event) │ │
│ │ 5. db_session.commit() ← ATOMIC! │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Database (PostgreSQL) │
│ ┌──────────────┐ ┌──────────────────────────────────┐ │
│ │ users table │ │ outbox_events table │ │
│ │ (domain) │ │ - id, event_type, payload │ │
│ │ │ │ - published (false → true) │ │
│ └──────────────┘ └──────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Background Worker (OutboxPublisherBase) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Every 5s: │ │
│ │ 1. Fetch unpublished (outbox_repo.get_unpublished) │ │
│ │ 2. Publish to Kafka (publisher.publish) │ │
│ │ 3. Mark published (outbox_repo.mark_published) │ │
│ │ 4. Handle errors (error_handler.handle) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Kafka / Message Broker │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Topic: com.app.user.created │ │
│ │ Message: {"id": "...", "data": {...}} │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Consumers (Other Services) │
└─────────────────────────────────────────────────────────┘
✅ Key Guarantees¶
Guarantee |
How? |
|---|---|
Atomicity |
Domain entity + outbox event in same DB transaction |
At-least-once delivery |
Events persisted before publishing |
No message loss |
Events durable in DB, retried on failure |
Idempotency |
|
FIFO ordering |
Per-aggregate via |
Dead Letter Queue |
|
🔧 Configuration via Environment¶
export OUTBOX_BATCH_SIZE=200
export OUTBOX_POLL_INTERVAL_SECONDS=10
export OUTBOX_MAX_RETRY_COUNT=5
export OUTBOX_ENABLE_METRICS=true
from outbox_sdk.core import OutboxConfig
config = OutboxConfig() # Auto-loads from env
print(config.batch_size) # 200
📈 Monitoring¶
Health Check Endpoint¶
from fastapi import FastAPI
from outbox_sdk.core import OutboxHealthCheck, HealthStatus
class MyHealthCheck(OutboxHealthCheck):
async def check_health(self):
return {
"status": HealthStatus.HEALTHY,
"checks": {
"database": await self.check_database(),
"broker": await self.check_broker(),
"outbox_lag": await self.check_outbox_lag(),
}
}
app = FastAPI()
health_check = MyHealthCheck()
@app.get("/health")
async def health():
return await health_check.check_health()
Structured Logs¶
{
"event": "outbox.event_published",
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"event_type": "com.app.user.created",
"aggregate_id": "user-123",
"timestamp": "2024-01-01T12:00:00Z"
}
{
"event": "outbox.batch_complete",
"published": 95,
"total": 100,
"success_rate": 0.95,
"timestamp": "2024-01-01T12:00:05Z"
}
🧪 Testing¶
Mock Repository¶
class InMemoryOutboxRepo(IOutboxRepository):
def __init__(self):
self.events = []
async def add_event(self, event):
self.events.append(event)
async def get_unpublished(self, limit=100, offset=0):
return [e for e in self.events if not e.published][:limit]
Mock Publisher¶
class FakePublisher(IEventPublisher):
def __init__(self):
self.published_messages = []
async def publish(self, message):
self.published_messages.append(message)
Unit Test¶
async def test_outbox_worker():
repo = InMemoryOutboxRepo()
publisher = FakePublisher()
worker = MyWorker(repo, publisher)
await repo.add_event(UserCreatedEvent(...))
published_count = await worker.publish_batch()
assert published_count == 1
assert len(publisher.published_messages) == 1
📚 See Also¶
Full API Docs:
CORE_API.mdCloudEvents Spec: https://cloudevents.io/
Transactional Outbox:
../../architecture/06_EVENT_PATTERNS.md
Lines of Code: 732 total | Files: 9 | Avg: 81 lines/file
Transactional Outbox Implementation Guide¶
✅ Pre-Implementation Checklist¶
[ ] Identify domain events that need reliable publishing
[ ] Choose message broker (Kafka strongly recommended)
[ ] Decide on worker scheduling strategy
[ ] Plan monitoring & alerting approach
📋 Step-by-Step Implementation¶
Phase 1: Core Setup¶
[ ] Install
python-outbox-corelibrary[ ] Create ORM model for outbox table (extend
IOutboxEvent)[ ] Implement
IOutboxRepositorywith SQLAlchemy[ ] Add database migration for outbox table
Phase 2: Event Publishing¶
[ ] Define domain events (inherit from
IOutboxEvent)[ ] Implement
to_message()for each event[ ] Update command handlers to write events to outbox
Phase 3: Worker Setup¶
[ ] Implement
IEventPublisherfor your broker (Kafka strongly recommended)[ ] Create worker class extending
OutboxPublisherBase[ ] Implement
schedule_publishing()method[ ] Configure worker startup/shutdown
Phase 4: Monitoring¶
[ ] Add metrics for outbox lag (
count_unpublished())[ ] Set up alerts for publishing failures
[ ] Monitor batch processing performance
[ ] Add health checks
🔧 Integration Patterns¶
Pattern A: FastAPI (lifespan) + Kafka + FastStream + Celery + Kong Events Gate¶
// TODO: (code example)
🔄 Lifespan implementation¶
What Is Lifespan?¶
Lifespan is FastAPI’s way to run code before your app starts and after it shuts down. It’s like “hooks” for startup/shutdown.
Your Current Lifespan (app_factory.py)¶
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan events."""
# ⬇️ STARTUP - runs ONCE when FastAPI starts
logger.info("GridFlow API starting up")
await init_db() # Initialize database connection pool
logger.info("Database initialized successfully")
yield # ← App is now running and serving requests
# ⬇️ SHUTDOWN - runs ONCE when FastAPI stops
logger.info("GridFlow API shutting down")
Flow:
1. Server starts (uvicorn run)
↓
2. Lifespan STARTUP runs
- Initialize database
- Log startup
↓
3. yield ← App is now ready
↓
4. App serves requests... (your API endpoints work)
↓
5. Server stops (Ctrl+C or crash)
↓
6. Lifespan SHUTDOWN runs
- Cleanup resources
- Log shutdown
🎯 Where Outbox Worker Fits¶
You need to start the outbox worker in the lifespan startup!
Current (Without Outbox):¶
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("GridFlow API starting up")
await init_db()
yield
# Shutdown
logger.info("GridFlow API shutting down")
Updated (With Outbox Worker):¶
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("GridFlow API starting up")
await init_db()
# 🆕 Start outbox worker as background task
outbox_worker = create_outbox_worker() # Your worker instance
worker_task = asyncio.create_task(outbox_worker.schedule_publishing())
logger.info("Outbox worker started")
yield # App is running
# Shutdown
logger.info("GridFlow API shutting down")
# 🆕 Stop outbox worker gracefully
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
logger.info("Outbox worker stopped")
📊 Complete Flow with Outbox¶
┌─────────────────────────────────────────────────────────┐
│ 1. FastAPI Server Starts (uvicorn) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 2. Lifespan STARTUP │
│ ├─ Initialize database (init_db) │
│ ├─ Create outbox worker │
│ └─ Start background task (asyncio.create_task) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 3. App Running (yield) │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Main FastAPI Process │ │
│ │ - Handles HTTP requests │ │
│ │ - Command handlers store events to outbox │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Background Outbox Worker (asyncio task) │ │
│ │ - Polls outbox table every 5s │ │
│ │ - Publishes events to Kafka │ │
│ │ - Marks events as published │ │
│ │ - Runs in parallel with FastAPI │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 4. Server Stops (Ctrl+C) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 5. Lifespan SHUTDOWN │
│ ├─ Cancel worker task │
│ ├─ Wait for graceful stop │
│ └─ Close database connections │
└─────────────────────────────────────────────────────────┘
💡 Why Use Lifespan for Outbox Worker?¶
Option A: Lifespan (Recommended for Single Server)¶
✅ Pros:
Simple - worker runs in same process as FastAPI
No separate Celery infrastructure
Graceful shutdown
Good for MVP/small scale
❌ Cons:
Single worker (no horizontal scaling)
Worker restarts when app restarts
Use when:
MVP or small-scale app
Running 1-2 FastAPI instances
Don’t want Celery complexity
Option B: Celery (Recommended for Production)¶
✅ Pros:
Distributed workers (horizontal scaling)
Workers independent of FastAPI
Can restart FastAPI without stopping workers
Better for high event volume
❌ Cons:
Requires Redis/RabbitMQ
More complex infrastructure
Separate worker process to manage
Use when:
Production scale
Multiple FastAPI instances
High event throughput
Need worker redundancy
🔧 Your Implementation Options¶
Option 1: Simple Lifespan Worker (Start Here)¶
# backend/src/infrastructure/app_factory.py
from contextlib import asynccontextmanager
import asyncio
from backend.src.infrastructure.messaging.outbox_worker import create_outbox_worker
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan with outbox worker."""
# Startup
logger.info("GridFlow API starting up")
await init_db()
# Start outbox worker
worker = create_outbox_worker()
worker_task = asyncio.create_task(worker.schedule_publishing())
logger.info("Outbox worker started in background")
yield # App is running
# Shutdown
logger.info("GridFlow API shutting down")
# Stop worker gracefully
worker_task.cancel()
try:
await asyncio.wait_for(worker_task, timeout=10.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
logger.info("Outbox worker stopped")
Option 2: Celery Worker (Later/Production)¶
# backend/src/infrastructure/app_factory.py
# (NO changes to lifespan - Celery runs separately)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan - Celery worker runs separately."""
# Startup
logger.info("GridFlow API starting up")
await init_db()
yield
# Shutdown
logger.info("GridFlow API shutting down")
# Separate Celery worker process:
# celery -A backend.celery_worker worker --loglevel=info
# celery -A backend.celery_worker beat --loglevel=info
simple startup integration:
Start with Option 1 (Lifespan)¶
Why:
✅ Simpler to implement initially
✅ No Celery infrastructure needed yet
✅ Good for testing outbox pattern
✅ Can migrate to Celery later without changing core logic
Later, move to Option 2 (Celery) when:
You need to scale (multiple workers)
Event volume is high
You need worker monitoring
You already use Celery for other tasks
📋 What You Need to Add¶
Outbox worker factory:
# backend/src/infrastructure/messaging/outbox_worker.py
from outbox_sdk.core import OutboxPublisherBase
from outbox_sdk.integrations.faststream import FastStreamKafkaPublisher
class GridFlowOutboxWorker(OutboxPublisherBase):
async def schedule_publishing(self):
"""Simple polling loop."""
while True:
await self.publish_batch(limit=100)
await asyncio.sleep(5)
def create_outbox_worker() -> GridFlowOutboxWorker:
"""Factory to create configured worker."""
repo = get_outbox_repository() # Your SQLAlchemy repo
publisher = FastStreamKafkaPublisher(kafka_broker) # Your Kafka publisher
return GridFlowOutboxWorker(repo, publisher)
Update lifespan (as shown above)
Done! Worker runs automatically when FastAPI starts.
TL;DR: Lifespan = FastAPI’s startup/shutdown hooks. You’ll start your outbox worker there as a background task, running in parallel with your API. 🚀
FAQ¶
Do we even need python-outbox-core if we use FastStream? Maybe FastStream by ag2ai itself replacing it? Or maybe Celery/Huey by coleifer. Grep codebase and Search web¶
TL;DR: they all solve DIFFERENT problems.¶
📊 What Each Tool Does¶
Tool |
Purpose |
What It Provides |
|---|---|---|
FastStream |
Message broker framework |
Kafka/RabbitMQ abstraction, serialization, async consumers |
Celery/Huey |
Task queue |
Background job scheduling, retries, result storage |
python-outbox-core |
Transactional Outbox pattern |
Atomic DB writes + event publishing guarantee |
🔍 The Confusion: Why They Seem Similar¶
FastStream (by ag2ai)¶
What it does:
✅ High-level Kafka/RabbitMQ API
✅ Auto-serialization (Pydantic models)
✅ Message routing
✅ Consumer decorators
What it DOESN’T do:
❌ No database integration
❌ No transactional guarantees
❌ No outbox table
❌ Doesn’t solve dual-write problem
Example:
from faststream.kafka import KafkaBroker
broker = KafkaBroker()
@broker.subscriber("events")
async def handle_event(msg: dict):
print(msg)
# Publishing
await broker.publish({"user_id": "123"}, topic="events")
Problem: If your DB write succeeds but Kafka is down, event is lost!
Huey (by coleifer)¶
What it does:
✅ Task queue (like Celery but simpler)
✅ Task scheduling
✅ Retries
✅ Result storage (Redis)
What it DOESN’T do:
❌ No event streaming (it’s for tasks, not events)
❌ No database integration
❌ No transactional guarantees
❌ Not designed for outbox pattern
Example:
from huey import RedisHuey
huey = RedisHuey('my-app')
@huey.task()
def send_email(user_id):
# Background task
pass
send_email.schedule(args=('user123',), delay=60)
Problem: Same as FastStream - no atomicity with DB writes!
python-outbox-core (our library)¶
What it does:
✅ Atomic DB + event storage (same transaction!)
✅ Dual-write problem solved
✅ Event durability (stored in DB)
✅ Retry logic (worker polls outbox table)
✅ Works WITH FastStream + Celery/Huey (not instead of)
Example:
# Command handler
async def create_user(cmd, user_repo, outbox_repo, db_session):
user = await user_repo.create(cmd.to_entity())
event = UserCreated(user_id=user.id)
await outbox_repo.add_event(event) # ← Same transaction!
await db_session.commit() # ← Atomic!
# Worker (uses FastStream internally)
class Worker(OutboxPublisherBase):
async def schedule_publishing(self):
while True:
await self.publish_batch() # ← Publishes to FastStream/Kafka
await asyncio.sleep(5)
🚨 The Dual-Write Problem¶
Without Outbox (Using FastStream/Huey directly):¶
async def create_user(cmd):
# 1. Write to database
user = await user_repo.create(cmd.to_entity())
await db_session.commit()
# 2. Publish event
await faststream_broker.publish(UserCreated(...)) # ❌ NOT ATOMIC!
# What if:
# - Kafka is down? Event lost!
# - App crashes here? Event lost!
# - Network timeout? Event lost!
Result: Your database and event stream are out of sync!
With Outbox (Using python-outbox-core + FastStream):¶
async def create_user(cmd):
# 1. Write to database AND outbox table
user = await user_repo.create(cmd.to_entity())
event = UserCreated(user_id=user.id)
await outbox_repo.add_event(event) # ← Outbox table (same DB)
await db_session.commit() # ← ATOMIC (both or neither)
# 2. Background worker publishes from outbox
# (Runs separately, retries on failure)
Result: Guaranteed consistency! Event never lost.
💡 How They Work Together¶
┌─────────────────────────────────────────────────────────┐
│ python-outbox-core │
│ ├─ IOutboxEvent (defines event structure) │
│ ├─ IOutboxRepository (stores to outbox table) │
│ └─ OutboxPublisherBase (worker logic) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ FastStream (by ag2ai) │
│ └─ KafkaBroker (publishes from outbox to Kafka) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Kafka │
│ └─ Event topics │
└─────────────────────────────────────────────────────────┘
Or with Celery/Huey:
┌─────────────────────────────────────────────────────────┐
│ python-outbox-core │
│ └─ OutboxPublisherBase (worker base class) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Celery/Huey (task scheduler) │
│ └─ Schedules outbox polling task every 5s │
└─────────────────────────────────────────────────────────┘
🎯 Why You Can’t Replace python-outbox-core¶
FastStream CANNOT:¶
❌ Store events in your PostgreSQL database
❌ Make event publishing atomic with DB writes
❌ Provide retry logic for failed publishes
❌ Implement outbox pattern abstractions
Celery/Huey CANNOT:¶
❌ Integrate with your SQLAlchemy models
❌ Provide event-specific abstractions (IOutboxEvent)
❌ Handle CloudEvents metadata
❌ Work with Kafka/FastStream directly
python-outbox-core PROVIDES:¶
✅ Transactional safety (DB + outbox in one transaction)
✅ Domain event contracts (IOutboxEvent with CloudEvents)
✅ Repository pattern (IOutboxRepository for SQLAlchemy)
✅ Worker template (OutboxPublisherBase you extend)
✅ Integrations (connects to FastStream/Huey/Celery)
📋 Existing Solutions?¶
We checked: There are NO mature Python libraries for transactional outbox pattern.
Alternatives:
pg_eventstore - Focuses on event sourcing, not outbox
Custom implementations - Everyone rolls their own
Java libraries - Debezium (CDC), but not for Python
python-outbox-core fills a real gap! 🎯
🚀 Recommendation¶
Keep python-outbox-core + Use FastStream¶
Your stack:
# Domain layer (your code)
event = UserCreated(...)
await outbox_repo.add_event(event) # python-outbox-core
await db_session.commit()
# Infrastructure layer (python-outbox-core)
class Worker(OutboxPublisherBase): # python-outbox-core base
def __init__(self, faststream_publisher): # FastStream integration
super().__init__(repo, faststream_publisher)
async def schedule_publishing(self):
# Option A: Simple loop
while True:
await self.publish_batch()
await asyncio.sleep(5)
# Option B: Huey task
@huey.periodic_task(crontab(minute='*/1'))
async def publish_outbox():
await self.publish_batch()
✅ Final Answer¶
Question |
Answer |
|---|---|
Can FastStream replace python-outbox-core? |
❌ NO - FastStream is for messaging, not transactional safety |
Can Celery/Huey replace python-outbox-core? |
❌ NO - Celery/Huey is for task scheduling, not outbox pattern |
Do you need python-outbox-core? |
✅ YES - It’s the only way to guarantee atomic DB + events |
How do they work together? |
✅ python-outbox-core stores events, FastStream publishes them, Celery/Celery (optional) schedules workers |
python-outbox-core is essential - it’s the foundation that makes FastStream/Celery work reliably! 🏗️
Serialization Notes¶
❓ Why No Custom Serializer?¶
Short answer: Pydantic v2 and FastStream already handle everything.
🔍 What Each Technology Does¶
✅ Pydantic v2 (Built-in)¶
# Serialize to JSON string
json_str = event.model_dump_json() # Handles UUID, datetime, etc.
# Deserialize from JSON string
event = UserCreatedEvent.model_validate_json(json_str)
Handles:
UUID → string
datetime → ISO 8601
Decimal → float
Nested models
Field validation
✅ FastStream (Auto-Serializes)¶
from faststream.kafka import KafkaBroker
broker = KafkaBroker()
# FastStream auto-converts dict → JSON!
await broker.publish(
{"user_id": "123", "email": "test@example.com"},
topic="events"
)
Handles:
dict → JSON string
Kafka message encoding
Content-Type headers
⚠️ When You DO Need Serialization¶
Only for PostgreSQL JSONB storage:
# Store to DB
payload = event.model_dump_json() # Pydantic built-in
orm = OutboxEventORM(id=event.event_id, payload=payload)
session.add(orm)
# Load from DB
event = UserCreatedEvent.model_validate_json(orm.payload) # Pydantic built-in
🎯 Pattern Summary¶
Command Handler
↓
event.model_dump_json() ← Store to PostgreSQL
↓
PostgreSQL JSONB
↓
EventClass.model_validate_json(json_str) ← Load from DB
↓
event.to_message() → dict
↓
broker.publish(dict) ← FastStream auto-serializes!
↓
Kafka (JSON)
🚨 Common Mistakes¶
❌ Don’t do this:¶
# BAD: Manual JSON encoding
import json
payload = json.dumps(event.dict()) # Breaks UUID, datetime!
# BAD: Custom serializer
class MySerializer:
def serialize(self, event): ... # Why reinvent Pydantic?
✅ Do this:¶
# GOOD: Use Pydantic
payload = event.model_dump_json()
event = EventClass.model_validate_json(payload)
📋 Serialization Matrix¶
Operation |
Tool |
Method |
|---|---|---|
Event → PostgreSQL |
Pydantic |
|
PostgreSQL → Event |
Pydantic |
|
Event → Kafka |
FastStream |
Auto (just pass dict) |
Kafka → Event |
FastStream |
Auto (handler receives dict) |
🎯 Best Practices¶
PostgreSQL Storage: Always use
model_dump_json()/model_validate_json()Kafka Publishing: Let FastStream handle serialization (just pass
dict)Custom Types: Define Pydantic validators, not custom serializers
Testing: Mock with dicts, not JSON strings
🔧 Event Registry Pattern¶
For deserializing from DB, you need an event type registry:
class EventRegistry:
"""Maps event_type string → Event class."""
_registry: dict[str, type[IOutboxEvent]] = {
"com.app.user.created": UserCreatedEvent,
"com.app.user.deleted": UserDeletedEvent,
}
@classmethod
def get(cls, event_type: str) -> type[IOutboxEvent]:
return cls._registry[event_type]
# In Repository
def _deserialize(self, json_str: str) -> IOutboxEvent:
data = json.loads(json_str)
event_class = EventRegistry.get(data["event_type"])
return event_class.model_validate_json(json_str)
Bottom line: No custom serializer needed. Pydantic + FastStream = complete solution. ✅