Processed Event Guard
@Service
Guard implementing the idempotent receiver pattern for Kafka consumers. Wraps the persistence-agnostic ProcessedEventRepositoryPort / ProcessedEventFactory ports.
Typical use inside a @KafkaListener:
fun handle(@Header("eventId") eventId: String?, @Payload payload: ByteArray) {
val id = eventId ?: run { logger.warn("missing eventId — skipping"); return }
if (!processedEventGuard.claim(id, GROUP)) {
logger.info("Duplicate event {} — skipping", id); return
}
// … process …
}Content copied to clipboard
The claim is performed in a REQUIRES_NEW transaction so a duplicate is detected even when the surrounding listener method has its own transaction — and so the dedup row is committed even if the business transaction later rolls back; preventing a poison message from being re-processed in an infinite loop.
Constructors
Link copied to clipboard
constructor(repository: ProcessedEventRepositoryPort, factory: ProcessedEventFactory, transactionManager: PlatformTransactionManager)