diff --git a/common-kafka/src/main/kotlin/fund/cyber/common/kafka/Configuration.kt b/common-kafka/src/main/kotlin/fund/cyber/common/kafka/Configuration.kt index 75cf9eb9..d5269883 100644 --- a/common-kafka/src/main/kotlin/fund/cyber/common/kafka/Configuration.kt +++ b/common-kafka/src/main/kotlin/fund/cyber/common/kafka/Configuration.kt @@ -12,9 +12,19 @@ fun defaultConsumerConfig() = mutableMapOf( ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to SESSION_TIMEOUT_MS_CONFIG ) + +fun idempotentProducerDefaultConfig() = mutableMapOf( + ProducerConfig.MAX_REQUEST_SIZE_CONFIG to KAFKA_MAX_MESSAGE_SIZE_BYTES, + /* This settings guarantee exactly once, in order delivery per partition for topics with replication-factor >= 2, + and assuming that the system doesn't suffer multiple hard failures or concurrent transient failures. + https://bit.ly/2IjnHxl */ + ProducerConfig.ACKS_CONFIG to "all", + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 2, + ProducerConfig.RETRIES_CONFIG to Int.MAX_VALUE, + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true +) + fun defaultProducerConfig() = mutableMapOf( ProducerConfig.MAX_REQUEST_SIZE_CONFIG to KAFKA_MAX_MESSAGE_SIZE_BYTES, ProducerConfig.ACKS_CONFIG to "all" ) - - diff --git a/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaBlockBundleProducer.kt b/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaBlockBundleProducer.kt index 3408de05..a889a80a 100644 --- a/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaBlockBundleProducer.kt +++ b/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaBlockBundleProducer.kt @@ -6,7 +6,6 @@ import fund.cyber.search.model.chains.ChainInfo import fund.cyber.search.model.events.PumpEvent import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component -import org.springframework.transaction.annotation.Transactional @Component @@ -15,15 +14,18 @@ class KafkaBlockBundleProducer( private val chainInfo: ChainInfo ) { - @Transactional fun storeBlockBundle(blockBundleEvents: List>) { - blockBundleEvents.forEach { event -> - val eventKey = event.first - val blockBundle = event.second - chainInfo.entityTypes.forEach { type -> - blockBundle.entitiesByType(type).forEach { entity -> - kafkaTemplate.send(type.kafkaTopicName(chainInfo), eventKey, entity) + kafkaTemplate.executeInTransaction { + + blockBundleEvents.forEach { event -> + val eventKey = event.first + val blockBundle = event.second + + chainInfo.entityTypes.forEach { type -> + blockBundle.entitiesByType(type).forEach { entity -> + kafkaTemplate.send(type.kafkaTopicName(chainInfo), eventKey, entity) + } } } } diff --git a/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaProducerConfiguration.kt b/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaProducerConfiguration.kt index 10965816..4c6e965d 100644 --- a/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaProducerConfiguration.kt +++ b/pumps/common/src/main/kotlin/fund/cyber/pump/common/kafka/KafkaProducerConfiguration.kt @@ -2,6 +2,7 @@ package fund.cyber.pump.common.kafka import fund.cyber.common.kafka.JsonSerializer import fund.cyber.common.kafka.defaultProducerConfig +import fund.cyber.common.kafka.idempotentProducerDefaultConfig import fund.cyber.common.kafka.kafkaTopicName import fund.cyber.common.with import fund.cyber.search.configuration.KAFKA_BROKERS @@ -22,9 +23,6 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaAdmin import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory -import org.springframework.kafka.transaction.KafkaTransactionManager -import org.springframework.transaction.annotation.EnableTransactionManagement -import org.springframework.transaction.support.AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS import java.util.concurrent.TimeUnit import javax.annotation.PostConstruct @@ -33,7 +31,6 @@ private const val TOPIC_REPLICATION_FACTOR: Short = 3 @EnableKafka @Configuration -@EnableTransactionManagement class KafkaProducerConfiguration { @Value("\${$KAFKA_BROKERS:$KAFKA_BROKERS_DEFAULT}") @@ -45,17 +42,17 @@ class KafkaProducerConfiguration { @Bean fun kafkaAdmin(): KafkaAdmin { val configs = mapOf( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers ) return KafkaAdmin(configs) } @Bean fun producerFactory(): ProducerFactory { - return DefaultKafkaProducerFactory(producerConfigs(), JsonSerializer(), JsonSerializer()) - .apply { - setTransactionIdPrefix(chainInfo.name + "_PUMP") - } + + val configs = idempotentProducerDefaultConfig().with(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers) + return DefaultKafkaProducerFactory(configs, JsonSerializer(), JsonSerializer()) + .apply { setTransactionIdPrefix(chainInfo.name + "_PUMP") } } @Bean @@ -65,7 +62,9 @@ class KafkaProducerConfiguration { @Bean fun producerFactoryPool(): ProducerFactory { - return DefaultKafkaProducerFactory(producerConfigs(), JsonSerializer(), JsonSerializer()) + + val configs = defaultProducerConfig().with(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers) + return DefaultKafkaProducerFactory(configs, JsonSerializer(), JsonSerializer()) } @Bean @@ -73,25 +72,13 @@ class KafkaProducerConfiguration { return KafkaTemplate(producerFactoryPool()) } - @Bean - fun producerConfigs(): Map = defaultProducerConfig().with( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers - ) - - @Bean - fun transactionManager(): KafkaTransactionManager { - return KafkaTransactionManager(producerFactory()).apply { - transactionSynchronization = SYNCHRONIZATION_ALWAYS - } - } - @Bean fun topicConfigs(): Map { return mapOf( - TopicConfig.RETENTION_MS_CONFIG to TimeUnit.DAYS.toMillis(CLEANUP_RETENTION_POLICY_TIME_DAYS) - .toString(), - TopicConfig.CLEANUP_POLICY_CONFIG to TopicConfig.CLEANUP_POLICY_DELETE, - TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG to "false" + TopicConfig.RETENTION_MS_CONFIG to TimeUnit.DAYS.toMillis(CLEANUP_RETENTION_POLICY_TIME_DAYS) + .toString(), + TopicConfig.CLEANUP_POLICY_CONFIG to TopicConfig.CLEANUP_POLICY_DELETE, + TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG to "false" ) } diff --git a/pumps/common/src/main/kotlin/fund/cyber/pump/common/pool/KafkaPoolItemProducer.kt b/pumps/common/src/main/kotlin/fund/cyber/pump/common/pool/KafkaPoolItemProducer.kt index c1ebfee1..0da6cc6d 100644 --- a/pumps/common/src/main/kotlin/fund/cyber/pump/common/pool/KafkaPoolItemProducer.kt +++ b/pumps/common/src/main/kotlin/fund/cyber/pump/common/pool/KafkaPoolItemProducer.kt @@ -23,7 +23,6 @@ class KafkaPoolItemProducer( kafkaTemplatePool.send(ChainEntityType.TX.kafkaTopicName(chainInfo), event, item) .addCallback({ _ -> }) { error -> log.error("Error during sending mempool item to kafka", error) - storeItem(itemEvent) } } }