KafkaOutboxProcessor

@Service
class KafkaOutboxProcessor(outboxRepository: OutboxRepositoryPort, outboxMessageFactory: OutboxMessageFactory, kafkaTemplate: KafkaTemplate<String, ByteArray>, properties: KafkaOutboxProperties, dispatchTx: OutboxDispatchTx)(source)

Reusable Kafka outbox processor implementing the transactional outbox pattern (Richardson, Microservices Patterns, ch. 3).

Dispatch is a two-phase flow that decouples broker IO from the caller's business transaction and prevents the same row from being published twice by concurrent poller instances:

  1. Claim (OutboxDispatchTx.claimBatch) — SELECT … FOR UPDATE SKIP LOCKED + transition rows to PROCESSING in a short transaction.

  2. Publish (dispatch) — broker network IO outside any DB transaction. Status transitions on success/failure happen in fresh REQUIRES_NEW transactions via OutboxDispatchTx so the broker call never holds a DB connection.

The transactional helpers live in a separate bean (OutboxDispatchTx) so the calls go through Spring's AOP proxy and actually open the requested transaction (self-invocation on this would bypass the proxy).

Depends only on persistence-agnostic ports (OutboxRepositoryPort and OutboxMessageFactory) so the outbox can be backed by any storage technology.

Constructors

Link copied to clipboard
constructor(outboxRepository: OutboxRepositoryPort, outboxMessageFactory: OutboxMessageFactory, kafkaTemplate: KafkaTemplate<String, ByteArray>, properties: KafkaOutboxProperties, dispatchTx: OutboxDispatchTx)

Functions

Link copied to clipboard
@Scheduled(fixedDelayString = "${veds.outbox.poll-interval-ms:5000}")
fun pollAndDispatch()

Scheduled poll fired every veds.outbox.poll-interval-ms (default 5000). Claims a batch of messages via OutboxDispatchTx.claimBatch and dispatches each one to Kafka outside any DB transaction. Safe to run on multiple instances — FOR UPDATE SKIP LOCKED guarantees each row is claimed by exactly one poller.

Link copied to clipboard
@Transactional
fun saveOutboxMessage(topic: String, key: String, payload: ByteArray, sagaId: String? = null, eventId: String? = null): OutboxMessage

Creates a new outbox message for a raw topic name.