Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Commit

Permalink
#219 Add callbacks on kafka producers send() operations in pump
Browse files Browse the repository at this point in the history
  • Loading branch information
arturalbov authored and hleb-albau committed Jun 23, 2018
1 parent 32a807b commit 249d6cd
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,19 @@ fun defaultConsumerConfig() = mutableMapOf<String, Any>(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to SESSION_TIMEOUT_MS_CONFIG
)


fun idempotentProducerDefaultConfig() = mutableMapOf(
ProducerConfig.MAX_REQUEST_SIZE_CONFIG to KAFKA_MAX_MESSAGE_SIZE_BYTES,
/* This settings guarantee exactly once, in order delivery per partition for topics with replication-factor >= 2,
and assuming that the system doesn't suffer multiple hard failures or concurrent transient failures.
https://bit.ly/2IjnHxl */
ProducerConfig.ACKS_CONFIG to "all",
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 2,
ProducerConfig.RETRIES_CONFIG to Int.MAX_VALUE,
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true
)

fun defaultProducerConfig() = mutableMapOf(
ProducerConfig.MAX_REQUEST_SIZE_CONFIG to KAFKA_MAX_MESSAGE_SIZE_BYTES,
ProducerConfig.ACKS_CONFIG to "all"
)


Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import fund.cyber.search.model.chains.ChainInfo
import fund.cyber.search.model.events.PumpEvent
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional


@Component
Expand All @@ -15,15 +14,18 @@ class KafkaBlockBundleProducer(
private val chainInfo: ChainInfo
) {

@Transactional
fun storeBlockBundle(blockBundleEvents: List<Pair<PumpEvent, BlockBundle>>) {
blockBundleEvents.forEach { event ->
val eventKey = event.first
val blockBundle = event.second

chainInfo.entityTypes.forEach { type ->
blockBundle.entitiesByType(type).forEach { entity ->
kafkaTemplate.send(type.kafkaTopicName(chainInfo), eventKey, entity)
kafkaTemplate.executeInTransaction {

blockBundleEvents.forEach { event ->
val eventKey = event.first
val blockBundle = event.second

chainInfo.entityTypes.forEach { type ->
blockBundle.entitiesByType(type).forEach { entity ->
kafkaTemplate.send(type.kafkaTopicName(chainInfo), eventKey, entity)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fund.cyber.pump.common.kafka

import fund.cyber.common.kafka.JsonSerializer
import fund.cyber.common.kafka.defaultProducerConfig
import fund.cyber.common.kafka.idempotentProducerDefaultConfig
import fund.cyber.common.kafka.kafkaTopicName
import fund.cyber.common.with
import fund.cyber.search.configuration.KAFKA_BROKERS
Expand All @@ -22,9 +23,6 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaAdmin
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.transaction.KafkaTransactionManager
import org.springframework.transaction.annotation.EnableTransactionManagement
import org.springframework.transaction.support.AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS
import java.util.concurrent.TimeUnit
import javax.annotation.PostConstruct

Expand All @@ -33,7 +31,6 @@ private const val TOPIC_REPLICATION_FACTOR: Short = 3

@EnableKafka
@Configuration
@EnableTransactionManagement
class KafkaProducerConfiguration {

@Value("\${$KAFKA_BROKERS:$KAFKA_BROKERS_DEFAULT}")
Expand All @@ -45,17 +42,17 @@ class KafkaProducerConfiguration {
@Bean
fun kafkaAdmin(): KafkaAdmin {
val configs = mapOf(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers
)
return KafkaAdmin(configs)
}

@Bean
fun producerFactory(): ProducerFactory<PumpEvent, Any> {
return DefaultKafkaProducerFactory<PumpEvent, Any>(producerConfigs(), JsonSerializer(), JsonSerializer())
.apply {
setTransactionIdPrefix(chainInfo.name + "_PUMP")
}

val configs = idempotentProducerDefaultConfig().with(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers)
return DefaultKafkaProducerFactory<PumpEvent, Any>(configs, JsonSerializer(), JsonSerializer())
.apply { setTransactionIdPrefix(chainInfo.name + "_PUMP") }
}

@Bean
Expand All @@ -65,33 +62,23 @@ class KafkaProducerConfiguration {

@Bean
fun producerFactoryPool(): ProducerFactory<PumpEvent, Any> {
return DefaultKafkaProducerFactory<PumpEvent, Any>(producerConfigs(), JsonSerializer(), JsonSerializer())

val configs = defaultProducerConfig().with(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers)
return DefaultKafkaProducerFactory<PumpEvent, Any>(configs, JsonSerializer(), JsonSerializer())
}

@Bean
fun kafkaTemplatePool(): KafkaTemplate<PumpEvent, Any> {
return KafkaTemplate(producerFactoryPool())
}

@Bean
fun producerConfigs(): Map<String, Any> = defaultProducerConfig().with(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaBrokers
)

@Bean
fun transactionManager(): KafkaTransactionManager<PumpEvent, Any> {
return KafkaTransactionManager(producerFactory()).apply {
transactionSynchronization = SYNCHRONIZATION_ALWAYS
}
}

@Bean
fun topicConfigs(): Map<String, String> {
return mapOf(
TopicConfig.RETENTION_MS_CONFIG to TimeUnit.DAYS.toMillis(CLEANUP_RETENTION_POLICY_TIME_DAYS)
.toString(),
TopicConfig.CLEANUP_POLICY_CONFIG to TopicConfig.CLEANUP_POLICY_DELETE,
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG to "false"
TopicConfig.RETENTION_MS_CONFIG to TimeUnit.DAYS.toMillis(CLEANUP_RETENTION_POLICY_TIME_DAYS)
.toString(),
TopicConfig.CLEANUP_POLICY_CONFIG to TopicConfig.CLEANUP_POLICY_DELETE,
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG to "false"
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class KafkaPoolItemProducer(
kafkaTemplatePool.send(ChainEntityType.TX.kafkaTopicName(chainInfo), event, item)
.addCallback({ _ -> }) { error ->
log.error("Error during sending mempool item to kafka", error)
storeItem(itemEvent)
}
}
}

0 comments on commit 249d6cd

Please sign in to comment.