python_outbox_core.publisher.base

Base publisher/worker for outbox event publishing.

Best Practices Applied: 1. Template Method pattern - reusable core, custom schedule 2. Batch processing for efficiency 3. Dependency injection for error handling & metrics; error handling with structured logging 4. Single Responsibility - only batch orchestration; only publishes, doesn’t schedule 5. Open/Closed principle - extend, don’t modify

References: - Template Method: https://refactoring.guru/design-patterns/template-method - Competing Consumers: https://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html

Classes

OutboxPublisherBase

Reusable outbox worker logic.

Module Contents

class python_outbox_core.publisher.base.OutboxPublisherBase(repository: python_outbox_core.repository.IOutboxRepository, publisher: python_outbox_core.publisher.interface.IEventPublisher, error_handler: python_outbox_core.publisher.error_handler.OutboxErrorHandler | None = None, metrics: python_outbox_core.publisher.metrics.OutboxMetrics | None = None)

Bases: abc.ABC

Reusable outbox worker logic.

Handles: - Batch fetching from outbox - Publishing to broker (via IEventPublisher) - Marking events as published (via IOutboxRepository) - Error handling (via OutboxErrorHandler) - Metrics/logging (via OutboxMetrics)

Projects extend this and implement schedule_publishing().

repository
publisher
error_handler
metrics
async publish_batch(limit: int = 100) int

Fetch and publish a batch of unpublished events.

Args:

limit: Max events to process in this batch

Returns:

Number of successfully published events

abstract schedule_publishing() None
Async:

Implement your scheduling strategy.

Options: - Infinite loop: while True + asyncio.sleep() - Celery beat: Distributed scheduling - K8s CronJob: Container-based scheduling