python-outbox-core

Core abstractions for the transactional outbox pattern (Kafka, FastStream, RabbitMQ).

Installation

pip install python-outbox-core

Public API

Class

Purpose

IOutboxEvent

Abstract event base (Pydantic)

IOutboxRepository

Persistence interface

IEventPublisher

Broker publishing interface

OutboxPublisherBase

Reusable worker logic

OutboxConfig

Worker configuration

OutboxErrorHandler

Per-event error handling

OutboxMetrics

Structured logging metrics

CloudEventsFormatter

CloudEvents 1.0 formatting

OutboxHealthCheck

Health check interface

Quick reference

Outbox Core - Quick Reference

NOTE: No custom serializer! Use Pydantic’s model_dump_json() & model_validate_json(). FastStream auto-serializes dicts.

💡 RECOMMENDED: FastStream (ag2ai) - cutting-edge for event-driven systems, ideal for startups. Custom serializers welcome.

API

# Event
class IOutboxEvent(BaseModel, ABC):
    event_id: UUID; event_type: str; aggregate_id: str
    occurred_at: datetime; source: str; data_version: str = "1.0"
    @abstractmethod
    def to_message(self) -> dict: ...

# Repository
class IOutboxRepository(ABC):
    async def add_event(event) -> None
    async def get_unpublished(limit=100) -> List[IOutboxEvent]
    async def mark_published(event_id: UUID) -> None
    async def mark_failed(event_id: UUID, error: str) -> None

# Publisher
class IEventPublisher(ABC):
    async def publish(message: dict) -> None  # FastStream handles JSON!

# Worker
class OutboxPublisherBase(ABC):
    async def publish_batch(limit=100) -> int
    @abstractmethod
    async def schedule_publishing() -> None

Usage

# 1. Define
class UserCreated(IOutboxEvent):
    user_id: UUID; email: str
    event_type: str = "user.created"; source: str = "api"
    def to_message(self): return {"id": str(self.event_id), "data": {...}}

# 2. Store (atomic with domain entity)
await outbox_repo.add_event(UserCreated(...))
await db_session.commit()

# 3. Worker
class Worker(OutboxPublisherBase):
    async def schedule_publishing(self):
        while True: await self.publish_batch(); await asyncio.sleep(5)

# 4. Repository (SQLAlchemy)
class Repo(IOutboxRepository):
    async def add_event(self, e):
        self.session.add(OutboxORM(
            id=e.event_id,
            payload=e.model_dump_json()  # Pydantic built-in!
        ))
    async def get_unpublished(self, limit=100):
        result = await self.session.execute(
            select(OutboxORM).where(OutboxORM.published==False).limit(limit)
        )
        return [EventClass.model_validate_json(orm.payload) for orm in result.scalars()]

# 5. Publisher (Kafka via FastStream)
class KafkaPub(IEventPublisher):
    async def publish(self, msg):
        # FastStream auto-serializes dict → JSON!
        await self.broker.publish(msg, topic=msg["type"])

Config (Env)

OUTBOX_BATCH_SIZE=100
OUTBOX_POLL_INTERVAL_SECONDS=5

Import

from outbox_sdk.core import (
    IOutboxEvent, IOutboxRepository, IEventPublisher,
    OutboxPublisherBase, OutboxConfig, OutboxErrorHandler, OutboxMetrics
)

Serialization Notes

No custom serializer needed!

  • DB Storage: Use event.model_dump_json() (Pydantic v2)

  • DB Loading: Use EventClass.model_validate_json(json_str) (Pydantic v2)

  • Kafka Publishing: FastStream auto-serializes dict → JSON

Pattern

Handler → DB(entity + outbox) → Worker → Kafka
         └──── ATOMIC ────┘

✅ Atomic | ✅ At-least-once | ✅ No loss | ✅ DLQ

Detailed guides

Outbox Core - API Quick Reference

Version: 0.1.0 | Status: ✅ Production Ready


📦 Installation

pip install outbox-sdk
# or
poetry add outbox-sdk

🔌 Core API

Events

from outbox_sdk.core import IOutboxEvent

class IOutboxEvent(BaseModel, ABC):
    # Required
    event_id: UUID
    event_type: str                   # "com.company.domain.action"
    aggregate_id: str
    occurred_at: datetime
    source: str                       # Service name

    # Versioning
    data_version: str = "1.0"

    # Tracing (optional)
    correlation_id: UUID | None
    causation_id: UUID | None

    @abstractmethod
    def to_message(self) -> dict[str, Any]:
        """Convert to broker format."""

Repository

from outbox_sdk.core import IOutboxRepository

class IOutboxRepository(ABC):
    async def add_event(event: IOutboxEvent) -> None
    async def get_unpublished(limit: int = 100, offset: int = 0) -> List[IOutboxEvent]
    async def mark_published(event_id: UUID) -> None
    async def mark_failed(event_id: UUID, error_message: str) -> None
    async def count_unpublished() -> int

Publisher

from outbox_sdk.core import IEventPublisher

class IEventPublisher(ABC):
    async def publish(message: dict[str, Any]) -> None

Worker

from outbox_sdk.core import OutboxPublisherBase

class OutboxPublisherBase(ABC):
    def __init__(
        repository: IOutboxRepository,
        publisher: IEventPublisher,
        error_handler: OutboxErrorHandler | None = None,
        metrics: OutboxMetrics | None = None,
    )

    async def publish_batch(limit: int = 100) -> int

    @abstractmethod
    async def schedule_publishing() -> None

Error Handling

from outbox_sdk.core import OutboxErrorHandler

class OutboxErrorHandler:
    def __init__(logger: Any = None, max_retries: int = 3)

    def handle(event: Any, exception: Exception) -> None
    def should_retry(event: Any, exception: Exception, attempt: int) -> bool
    def is_transient_error(exception: Exception) -> bool

Metrics

from outbox_sdk.core import OutboxMetrics

class OutboxMetrics:
    def __init__(logger: Any = None)

    def log_no_events() -> None
    def log_success(event: Any) -> None
    def log_batch_complete(published: int, total: int) -> None
    def log_batch_started(batch_size: int) -> None

Configuration

from outbox_sdk.core import OutboxConfig

class OutboxConfig(BaseModel):
    batch_size: int = 100                  # 1-1000
    poll_interval_seconds: int = 5         # 1-3600
    max_retry_count: int = 3               # 0-100
    retry_backoff_multiplier: float = 2.0  # 1.0-10.0
    enable_metrics: bool = True
    enable_health_check: bool = True

    # Env vars: OUTBOX_BATCH_SIZE, OUTBOX_POLL_INTERVAL_SECONDS, etc.

Health Checks

from outbox_sdk.core import OutboxHealthCheck, HealthStatus

class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

class OutboxHealthCheck(ABC):
    async def check_health() -> Dict[str, Any]
    async def check_database() -> Dict[str, Any]
    async def check_broker() -> Dict[str, Any]
    async def check_outbox_lag() -> Dict[str, Any]

Serialization

NOTE: No custom serializer!

💡 RECOMMENDED: FastStream (ag2ai) - cutting-edge event-driven framework, especially for startups. Custom serializers supported if needed.

In most cases just use Pydantic v2 built-in methods:

# Serialize to JSON string (for PostgreSQL JSONB)
json_str = event.model_dump_json()

# Deserialize from JSON string
event = UserCreatedEvent.model_validate_json(json_str)

# For Kafka: FastStream auto-serializes dicts!
await broker.publish(event.to_message())  # No manual JSON needed

🚀 Quick Start

1. Define Event

from outbox_sdk.core import IOutboxEvent
from uuid import UUID, uuid4
from datetime import datetime
from pydantic import Field

class UserCreatedEvent(IOutboxEvent):
    user_id: UUID
    email: str

    event_id: UUID = Field(default_factory=uuid4)
    event_type: str = "com.app.user.created"
    aggregate_id: str = Field(alias="user_id")
    occurred_at: datetime = Field(default_factory=datetime.utcnow)
    source: str = "user-service"

    def to_message(self) -> dict:
        return {
            "id": str(self.event_id),
            "type": self.event_type,
            "data": {"user_id": str(self.user_id), "email": self.email}
        }

2. Store in Transaction

async def create_user(cmd, user_repo, outbox_repo, db_session):
    user = await user_repo.create(cmd.to_entity())
    event = UserCreatedEvent(user_id=user.id, email=user.email)
    await outbox_repo.add_event(event)
    await db_session.commit()  # Atomic!

3. Implement Worker

from outbox_sdk.core import OutboxPublisherBase
import asyncio

class MyWorker(OutboxPublisherBase):
    async def schedule_publishing(self):
        while True:
            await self.publish_batch(limit=100)
            await asyncio.sleep(5)

4. Implement Repository

from outbox_sdk.core import IOutboxRepository
from sqlalchemy.ext.asyncio import AsyncSession

class SQLAlchemyOutboxRepo(IOutboxRepository):
    def __init__(self, session: AsyncSession):
        self.session = session

    async def add_event(self, event):
        orm_event = OutboxEventORM(
            id=event.event_id,
            event_type=event.event_type,
            payload=event.model_dump_json(),  # Pydantic built-in!
            # ...
        )
        self.session.add(orm_event)
        # No commit - caller handles!

    async def get_unpublished(self, limit=100, offset=0):
        result = await self.session.execute(
            select(OutboxEventORM)
            .where(OutboxEventORM.published == False)
            .order_by(OutboxEventORM.occurred_at.asc())
            .limit(limit)
        )
        # Deserialize using Pydantic built-in
        events = []
        for orm in result.scalars().all():
            event_class = self.event_registry[orm.event_type]
            events.append(event_class.model_validate_json(orm.payload))
        return events

    async def mark_published(self, event_id):
        await self.session.execute(
            update(OutboxEventORM)
            .where(OutboxEventORM.id == event_id)
            .values(published=True, published_at=datetime.utcnow())
        )

    async def mark_failed(self, event_id, error_message):
        await self.session.execute(
            update(OutboxEventORM)
            .where(OutboxEventORM.id == event_id)
            .values(failed=True, error_message=error_message)
        )

    async def count_unpublished(self):
        result = await self.session.execute(
            select(func.count(OutboxEventORM.id))
            .where(OutboxEventORM.published == False)
        )
        return result.scalar()

5. Implement Publisher

from outbox_sdk.core import IEventPublisher
from faststream.kafka import KafkaBroker

class KafkaPublisher(IEventPublisher):
    def __init__(self, broker: KafkaBroker):
        self.broker = broker

    async def publish(self, message: dict):
        # NOTE: FastStream auto-serializes dict → JSON!
        await self.broker.publish(
            message,
            topic=message.get("type"),
            key=message.get("aggregate_id")
        )

🎯 Full Import Example

from outbox_sdk.core import (
    # Contracts
    IOutboxEvent,
    IOutboxRepository,
    IEventPublisher,

    # Worker
    OutboxPublisherBase,
    OutboxErrorHandler,
    OutboxMetrics,

    # Utils
    OutboxConfig,

    # Health
    OutboxHealthCheck,
    HealthStatus,
)

# NOTE: No OutboxSerializer - use Pydantic's model_dump_json() instead!

📊 Architecture

┌─────────────────────────────────────────────────────────┐
│ Command Handler                                         │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. Create domain entity                             │ │
│ │ 2. Save entity (user_repo.save)                     │ │
│ │ 3. Create event (UserCreatedEvent)                  │ │
│ │ 4. Store event (outbox_repo.add_event)             │ │
│ │ 5. db_session.commit() ← ATOMIC!                    │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ Database (PostgreSQL)                                   │
│ ┌──────────────┐  ┌──────────────────────────────────┐ │
│ │ users table  │  │ outbox_events table              │ │
│ │ (domain)     │  │ - id, event_type, payload        │ │
│ │              │  │ - published (false → true)       │ │
│ └──────────────┘  └──────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ Background Worker (OutboxPublisherBase)                 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Every 5s:                                           │ │
│ │ 1. Fetch unpublished (outbox_repo.get_unpublished) │ │
│ │ 2. Publish to Kafka (publisher.publish)            │ │
│ │ 3. Mark published (outbox_repo.mark_published)     │ │
│ │ 4. Handle errors (error_handler.handle)            │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ Kafka / Message Broker                                  │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Topic: com.app.user.created                         │ │
│ │ Message: {"id": "...", "data": {...}}               │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ Consumers (Other Services)                              │
└─────────────────────────────────────────────────────────┘

✅ Key Guarantees

Guarantee

How?

Atomicity

Domain entity + outbox event in same DB transaction

At-least-once delivery

Events persisted before publishing

No message loss

Events durable in DB, retried on failure

Idempotency

event_id as idempotency key

FIFO ordering

Per-aggregate via occurred_at ASC

Dead Letter Queue

mark_failed() for permanent failures


🔧 Configuration via Environment

export OUTBOX_BATCH_SIZE=200
export OUTBOX_POLL_INTERVAL_SECONDS=10
export OUTBOX_MAX_RETRY_COUNT=5
export OUTBOX_ENABLE_METRICS=true
from outbox_sdk.core import OutboxConfig

config = OutboxConfig()  # Auto-loads from env
print(config.batch_size)  # 200

📈 Monitoring

Health Check Endpoint

from fastapi import FastAPI
from outbox_sdk.core import OutboxHealthCheck, HealthStatus

class MyHealthCheck(OutboxHealthCheck):
    async def check_health(self):
        return {
            "status": HealthStatus.HEALTHY,
            "checks": {
                "database": await self.check_database(),
                "broker": await self.check_broker(),
                "outbox_lag": await self.check_outbox_lag(),
            }
        }

app = FastAPI()
health_check = MyHealthCheck()

@app.get("/health")
async def health():
    return await health_check.check_health()

Structured Logs

{
  "event": "outbox.event_published",
  "event_id": "550e8400-e29b-41d4-a716-446655440000",
  "event_type": "com.app.user.created",
  "aggregate_id": "user-123",
  "timestamp": "2024-01-01T12:00:00Z"
}

{
  "event": "outbox.batch_complete",
  "published": 95,
  "total": 100,
  "success_rate": 0.95,
  "timestamp": "2024-01-01T12:00:05Z"
}

🧪 Testing

Mock Repository

class InMemoryOutboxRepo(IOutboxRepository):
    def __init__(self):
        self.events = []

    async def add_event(self, event):
        self.events.append(event)

    async def get_unpublished(self, limit=100, offset=0):
        return [e for e in self.events if not e.published][:limit]

Mock Publisher

class FakePublisher(IEventPublisher):
    def __init__(self):
        self.published_messages = []

    async def publish(self, message):
        self.published_messages.append(message)

Unit Test

async def test_outbox_worker():
    repo = InMemoryOutboxRepo()
    publisher = FakePublisher()
    worker = MyWorker(repo, publisher)

    await repo.add_event(UserCreatedEvent(...))

    published_count = await worker.publish_batch()

    assert published_count == 1
    assert len(publisher.published_messages) == 1

📚 See Also


Lines of Code: 732 total | Files: 9 | Avg: 81 lines/file

Transactional Outbox Implementation Guide

✅ Pre-Implementation Checklist

  • [ ] Identify domain events that need reliable publishing

  • [ ] Choose message broker (Kafka strongly recommended)

  • [ ] Decide on worker scheduling strategy

  • [ ] Plan monitoring & alerting approach

📋 Step-by-Step Implementation

Phase 1: Core Setup

  • [ ] Install python-outbox-core library

  • [ ] Create ORM model for outbox table (extend IOutboxEvent)

  • [ ] Implement IOutboxRepository with SQLAlchemy

  • [ ] Add database migration for outbox table

Phase 2: Event Publishing

  • [ ] Define domain events (inherit from IOutboxEvent)

  • [ ] Implement to_message() for each event

  • [ ] Update command handlers to write events to outbox

Phase 3: Worker Setup

  • [ ] Implement IEventPublisher for your broker (Kafka strongly recommended)

  • [ ] Create worker class extending OutboxPublisherBase

  • [ ] Implement schedule_publishing() method

  • [ ] Configure worker startup/shutdown

Phase 4: Monitoring

  • [ ] Add metrics for outbox lag (count_unpublished())

  • [ ] Set up alerts for publishing failures

  • [ ] Monitor batch processing performance

  • [ ] Add health checks

🔧 Integration Patterns

Pattern A: FastAPI (lifespan) + Kafka + FastStream + Celery + Kong Events Gate

// TODO: (code example)

🔄 Lifespan implementation

What Is Lifespan?

Lifespan is FastAPI’s way to run code before your app starts and after it shuts down. It’s like “hooks” for startup/shutdown.

Your Current Lifespan (app_factory.py)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan events."""
    # ⬇️ STARTUP - runs ONCE when FastAPI starts
    logger.info("GridFlow API starting up")
    await init_db()  # Initialize database connection pool
    logger.info("Database initialized successfully")

    yield  # ← App is now running and serving requests

    # ⬇️ SHUTDOWN - runs ONCE when FastAPI stops
    logger.info("GridFlow API shutting down")

Flow:

1. Server starts (uvicorn run)
   ↓
2. Lifespan STARTUP runs
   - Initialize database
   - Log startup
   ↓
3. yield ← App is now ready
   ↓
4. App serves requests... (your API endpoints work)
   ↓
5. Server stops (Ctrl+C or crash)
   ↓
6. Lifespan SHUTDOWN runs
   - Cleanup resources
   - Log shutdown

🎯 Where Outbox Worker Fits

You need to start the outbox worker in the lifespan startup!

Current (Without Outbox):

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    logger.info("GridFlow API starting up")
    await init_db()
    yield
    # Shutdown
    logger.info("GridFlow API shutting down")

Updated (With Outbox Worker):

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    logger.info("GridFlow API starting up")
    await init_db()

    # 🆕 Start outbox worker as background task
    outbox_worker = create_outbox_worker()  # Your worker instance
    worker_task = asyncio.create_task(outbox_worker.schedule_publishing())
    logger.info("Outbox worker started")

    yield  # App is running

    # Shutdown
    logger.info("GridFlow API shutting down")

    # 🆕 Stop outbox worker gracefully
    worker_task.cancel()
    try:
        await worker_task
    except asyncio.CancelledError:
        logger.info("Outbox worker stopped")

📊 Complete Flow with Outbox

┌─────────────────────────────────────────────────────────┐
│ 1. FastAPI Server Starts (uvicorn)                     │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ 2. Lifespan STARTUP                                     │
│    ├─ Initialize database (init_db)                     │
│    ├─ Create outbox worker                             │
│    └─ Start background task (asyncio.create_task)      │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ 3. App Running (yield)                                  │
│                                                         │
│    ┌─────────────────────────────────────────────┐    │
│    │ Main FastAPI Process                        │    │
│    │ - Handles HTTP requests                     │    │
│    │ - Command handlers store events to outbox   │    │
│    └─────────────────────────────────────────────┘    │
│                                                         │
│    ┌─────────────────────────────────────────────┐    │
│    │ Background Outbox Worker (asyncio task)     │    │
│    │ - Polls outbox table every 5s               │    │
│    │ - Publishes events to Kafka                 │    │
│    │ - Marks events as published                 │    │
│    │ - Runs in parallel with FastAPI             │    │
│    └─────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ 4. Server Stops (Ctrl+C)                               │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ 5. Lifespan SHUTDOWN                                    │
│    ├─ Cancel worker task                               │
│    ├─ Wait for graceful stop                           │
│    └─ Close database connections                       │
└─────────────────────────────────────────────────────────┘

💡 Why Use Lifespan for Outbox Worker?



🔧 Your Implementation Options

Option 1: Simple Lifespan Worker (Start Here)

# backend/src/infrastructure/app_factory.py

from contextlib import asynccontextmanager
import asyncio
from backend.src.infrastructure.messaging.outbox_worker import create_outbox_worker

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan with outbox worker."""
    # Startup
    logger.info("GridFlow API starting up")
    await init_db()

    # Start outbox worker
    worker = create_outbox_worker()
    worker_task = asyncio.create_task(worker.schedule_publishing())
    logger.info("Outbox worker started in background")

    yield  # App is running

    # Shutdown
    logger.info("GridFlow API shutting down")

    # Stop worker gracefully
    worker_task.cancel()
    try:
        await asyncio.wait_for(worker_task, timeout=10.0)
    except (asyncio.CancelledError, asyncio.TimeoutError):
        logger.info("Outbox worker stopped")

Option 2: Celery Worker (Later/Production)

# backend/src/infrastructure/app_factory.py
# (NO changes to lifespan - Celery runs separately)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan - Celery worker runs separately."""
    # Startup
    logger.info("GridFlow API starting up")
    await init_db()
    yield
    # Shutdown
    logger.info("GridFlow API shutting down")

# Separate Celery worker process:
# celery -A backend.celery_worker worker --loglevel=info
# celery -A backend.celery_worker beat --loglevel=info

simple startup integration:

Start with Option 1 (Lifespan)

Why:

  • ✅ Simpler to implement initially

  • ✅ No Celery infrastructure needed yet

  • ✅ Good for testing outbox pattern

  • ✅ Can migrate to Celery later without changing core logic

Later, move to Option 2 (Celery) when:

  • You need to scale (multiple workers)

  • Event volume is high

  • You need worker monitoring

  • You already use Celery for other tasks


📋 What You Need to Add

  1. Outbox worker factory:

# backend/src/infrastructure/messaging/outbox_worker.py

from outbox_sdk.core import OutboxPublisherBase
from outbox_sdk.integrations.faststream import FastStreamKafkaPublisher

class GridFlowOutboxWorker(OutboxPublisherBase):
    async def schedule_publishing(self):
        """Simple polling loop."""
        while True:
            await self.publish_batch(limit=100)
            await asyncio.sleep(5)

def create_outbox_worker() -> GridFlowOutboxWorker:
    """Factory to create configured worker."""
    repo = get_outbox_repository()  # Your SQLAlchemy repo
    publisher = FastStreamKafkaPublisher(kafka_broker)  # Your Kafka publisher
    return GridFlowOutboxWorker(repo, publisher)
  1. Update lifespan (as shown above)

  2. Done! Worker runs automatically when FastAPI starts.


TL;DR: Lifespan = FastAPI’s startup/shutdown hooks. You’ll start your outbox worker there as a background task, running in parallel with your API. 🚀

FAQ

Do we even need python-outbox-core if we use FastStream? Maybe FastStream by ag2ai itself replacing it? Or maybe Celery/Huey by coleifer. Grep codebase and Search web

TL;DR: they all solve DIFFERENT problems.

📊 What Each Tool Does

Tool

Purpose

What It Provides

FastStream

Message broker framework

Kafka/RabbitMQ abstraction, serialization, async consumers

Celery/Huey

Task queue

Background job scheduling, retries, result storage

python-outbox-core

Transactional Outbox pattern

Atomic DB writes + event publishing guarantee


🔍 The Confusion: Why They Seem Similar

FastStream (by ag2ai)

What it does:

  • ✅ High-level Kafka/RabbitMQ API

  • ✅ Auto-serialization (Pydantic models)

  • ✅ Message routing

  • ✅ Consumer decorators

What it DOESN’T do:

  • No database integration

  • No transactional guarantees

  • No outbox table

  • Doesn’t solve dual-write problem

Example:

from faststream.kafka import KafkaBroker

broker = KafkaBroker()

@broker.subscriber("events")
async def handle_event(msg: dict):
    print(msg)

# Publishing
await broker.publish({"user_id": "123"}, topic="events")

Problem: If your DB write succeeds but Kafka is down, event is lost!


Huey (by coleifer)

What it does:

  • ✅ Task queue (like Celery but simpler)

  • ✅ Task scheduling

  • ✅ Retries

  • ✅ Result storage (Redis)

What it DOESN’T do:

  • No event streaming (it’s for tasks, not events)

  • No database integration

  • No transactional guarantees

  • Not designed for outbox pattern

Example:

from huey import RedisHuey

huey = RedisHuey('my-app')

@huey.task()
def send_email(user_id):
    # Background task
    pass

send_email.schedule(args=('user123',), delay=60)

Problem: Same as FastStream - no atomicity with DB writes!


python-outbox-core (our library)

What it does:

  • Atomic DB + event storage (same transaction!)

  • Dual-write problem solved

  • Event durability (stored in DB)

  • Retry logic (worker polls outbox table)

  • Works WITH FastStream + Celery/Huey (not instead of)

Example:

# Command handler
async def create_user(cmd, user_repo, outbox_repo, db_session):
    user = await user_repo.create(cmd.to_entity())
    event = UserCreated(user_id=user.id)

    await outbox_repo.add_event(event)  # ← Same transaction!
    await db_session.commit()  # ← Atomic!

# Worker (uses FastStream internally)
class Worker(OutboxPublisherBase):
    async def schedule_publishing(self):
        while True:
            await self.publish_batch()  # ← Publishes to FastStream/Kafka
            await asyncio.sleep(5)

🚨 The Dual-Write Problem

Without Outbox (Using FastStream/Huey directly):

async def create_user(cmd):
    # 1. Write to database
    user = await user_repo.create(cmd.to_entity())
    await db_session.commit()

    # 2. Publish event
    await faststream_broker.publish(UserCreated(...))  # ❌ NOT ATOMIC!

    # What if:
    # - Kafka is down? Event lost!
    # - App crashes here? Event lost!
    # - Network timeout? Event lost!

Result: Your database and event stream are out of sync!


With Outbox (Using python-outbox-core + FastStream):

async def create_user(cmd):
    # 1. Write to database AND outbox table
    user = await user_repo.create(cmd.to_entity())
    event = UserCreated(user_id=user.id)
    await outbox_repo.add_event(event)  # ← Outbox table (same DB)
    await db_session.commit()  # ← ATOMIC (both or neither)

    # 2. Background worker publishes from outbox
    # (Runs separately, retries on failure)

Result: Guaranteed consistency! Event never lost.


💡 How They Work Together

┌─────────────────────────────────────────────────────────┐
│ python-outbox-core                                      │
│ ├─ IOutboxEvent (defines event structure)              │
│ ├─ IOutboxRepository (stores to outbox table)          │
│ └─ OutboxPublisherBase (worker logic)                  │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ FastStream (by ag2ai)                                   │
│ └─ KafkaBroker (publishes from outbox to Kafka)        │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ Kafka                                                   │
│ └─ Event topics                                         │
└─────────────────────────────────────────────────────────┘

Or with Celery/Huey:

┌─────────────────────────────────────────────────────────┐
│ python-outbox-core                                      │
│ └─ OutboxPublisherBase (worker base class)             │
└─────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────┐
│ Celery/Huey (task scheduler)                                   │
│ └─ Schedules outbox polling task every 5s              │
└─────────────────────────────────────────────────────────┘

🎯 Why You Can’t Replace python-outbox-core

FastStream CANNOT:

  1. ❌ Store events in your PostgreSQL database

  2. ❌ Make event publishing atomic with DB writes

  3. ❌ Provide retry logic for failed publishes

  4. ❌ Implement outbox pattern abstractions

Celery/Huey CANNOT:

  1. ❌ Integrate with your SQLAlchemy models

  2. ❌ Provide event-specific abstractions (IOutboxEvent)

  3. ❌ Handle CloudEvents metadata

  4. ❌ Work with Kafka/FastStream directly

python-outbox-core PROVIDES:

  1. Transactional safety (DB + outbox in one transaction)

  2. Domain event contracts (IOutboxEvent with CloudEvents)

  3. Repository pattern (IOutboxRepository for SQLAlchemy)

  4. Worker template (OutboxPublisherBase you extend)

  5. Integrations (connects to FastStream/Huey/Celery)


📋 Existing Solutions?

We checked: There are NO mature Python libraries for transactional outbox pattern.

Alternatives:

  1. pg_eventstore - Focuses on event sourcing, not outbox

  2. Custom implementations - Everyone rolls their own

  3. Java libraries - Debezium (CDC), but not for Python

python-outbox-core fills a real gap! 🎯


🚀 Recommendation

Keep python-outbox-core + Use FastStream

Your stack:

# Domain layer (your code)
event = UserCreated(...)
await outbox_repo.add_event(event)  # python-outbox-core
await db_session.commit()

# Infrastructure layer (python-outbox-core)
class Worker(OutboxPublisherBase):  # python-outbox-core base
    def __init__(self, faststream_publisher):  # FastStream integration
        super().__init__(repo, faststream_publisher)

    async def schedule_publishing(self):
        # Option A: Simple loop
        while True:
            await self.publish_batch()
            await asyncio.sleep(5)

        # Option B: Huey task
        @huey.periodic_task(crontab(minute='*/1'))
        async def publish_outbox():
            await self.publish_batch()

Final Answer

Question

Answer

Can FastStream replace python-outbox-core?

NO - FastStream is for messaging, not transactional safety

Can Celery/Huey replace python-outbox-core?

NO - Celery/Huey is for task scheduling, not outbox pattern

Do you need python-outbox-core?

YES - It’s the only way to guarantee atomic DB + events

How do they work together?

python-outbox-core stores events, FastStream publishes them, Celery/Celery (optional) schedules workers


python-outbox-core is essential - it’s the foundation that makes FastStream/Celery work reliably! 🏗️

Serialization Notes

❓ Why No Custom Serializer?

Short answer: Pydantic v2 and FastStream already handle everything.


🔍 What Each Technology Does

Pydantic v2 (Built-in)

# Serialize to JSON string
json_str = event.model_dump_json()  # Handles UUID, datetime, etc.

# Deserialize from JSON string
event = UserCreatedEvent.model_validate_json(json_str)

Handles:

  • UUID → string

  • datetime → ISO 8601

  • Decimal → float

  • Nested models

  • Field validation


FastStream (Auto-Serializes)

from faststream.kafka import KafkaBroker

broker = KafkaBroker()

# FastStream auto-converts dict → JSON!
await broker.publish(
    {"user_id": "123", "email": "test@example.com"},
    topic="events"
)

Handles:

  • dict → JSON string

  • Kafka message encoding

  • Content-Type headers


⚠️ When You DO Need Serialization

Only for PostgreSQL JSONB storage:

# Store to DB
payload = event.model_dump_json()  # Pydantic built-in
orm = OutboxEventORM(id=event.event_id, payload=payload)
session.add(orm)

# Load from DB
event = UserCreatedEvent.model_validate_json(orm.payload)  # Pydantic built-in

🎯 Pattern Summary

Command Handler
    ↓
event.model_dump_json()  ← Store to PostgreSQL
    ↓
PostgreSQL JSONB
    ↓
EventClass.model_validate_json(json_str)  ← Load from DB
    ↓
event.to_message() → dict
    ↓
broker.publish(dict)  ← FastStream auto-serializes!
    ↓
Kafka (JSON)

🚨 Common Mistakes

Don’t do this:

# BAD: Manual JSON encoding
import json
payload = json.dumps(event.dict())  # Breaks UUID, datetime!

# BAD: Custom serializer
class MySerializer:
    def serialize(self, event): ...  # Why reinvent Pydantic?

Do this:

# GOOD: Use Pydantic
payload = event.model_dump_json()
event = EventClass.model_validate_json(payload)

📋 Serialization Matrix

Operation

Tool

Method

Event → PostgreSQL

Pydantic

event.model_dump_json()

PostgreSQL → Event

Pydantic

EventClass.model_validate_json(str)

Event → Kafka

FastStream

Auto (just pass dict)

Kafka → Event

FastStream

Auto (handler receives dict)


🎯 Best Practices

  1. PostgreSQL Storage: Always use model_dump_json() / model_validate_json()

  2. Kafka Publishing: Let FastStream handle serialization (just pass dict)

  3. Custom Types: Define Pydantic validators, not custom serializers

  4. Testing: Mock with dicts, not JSON strings


🔧 Event Registry Pattern

For deserializing from DB, you need an event type registry:

class EventRegistry:
    """Maps event_type string → Event class."""

    _registry: dict[str, type[IOutboxEvent]] = {
        "com.app.user.created": UserCreatedEvent,
        "com.app.user.deleted": UserDeletedEvent,
    }

    @classmethod
    def get(cls, event_type: str) -> type[IOutboxEvent]:
        return cls._registry[event_type]

# In Repository
def _deserialize(self, json_str: str) -> IOutboxEvent:
    data = json.loads(json_str)
    event_class = EventRegistry.get(data["event_type"])
    return event_class.model_validate_json(json_str)

Bottom line: No custom serializer needed. Pydantic + FastStream = complete solution. ✅