python_outbox_core.publisher.base¶
Base publisher/worker for outbox event publishing.
Best Practices Applied: 1. Template Method pattern - reusable core, custom schedule 2. Batch processing for efficiency 3. Dependency injection for error handling & metrics; error handling with structured logging 4. Single Responsibility - only batch orchestration; only publishes, doesn’t schedule 5. Open/Closed principle - extend, don’t modify
References: - Template Method: https://refactoring.guru/design-patterns/template-method - Competing Consumers: https://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html
Classes¶
Reusable outbox worker logic. |
Module Contents¶
- class python_outbox_core.publisher.base.OutboxPublisherBase(repository: python_outbox_core.repository.IOutboxRepository, publisher: python_outbox_core.publisher.interface.IEventPublisher, error_handler: python_outbox_core.publisher.error_handler.OutboxErrorHandler | None = None, metrics: python_outbox_core.publisher.metrics.OutboxMetrics | None = None)¶
Bases:
abc.ABCReusable outbox worker logic.
Handles: - Batch fetching from outbox - Publishing to broker (via IEventPublisher) - Marking events as published (via IOutboxRepository) - Error handling (via OutboxErrorHandler) - Metrics/logging (via OutboxMetrics)
Projects extend this and implement schedule_publishing().
- repository¶
- publisher¶
- error_handler¶
- metrics¶
- async publish_batch(limit: int = 100) int¶
Fetch and publish a batch of unpublished events.
- Args:
limit: Max events to process in this batch
- Returns:
Number of successfully published events
- abstract schedule_publishing() None¶
- Async:
Implement your scheduling strategy.
Options: - Infinite loop: while True + asyncio.sleep() - Celery beat: Distributed scheduling - K8s CronJob: Container-based scheduling