Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Commit

Permalink
#72 Realtime chain reorganisation. Renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
arturalbov authored and hleb-albau committed Apr 6, 2018
1 parent dfe62f4 commit a26e01c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import fund.cyber.common.StackCache
import fund.cyber.pump.common.kafka.KafkaBlockBundleProducer
import fund.cyber.pump.common.kafka.LastPumpedBundlesProvider
import fund.cyber.pump.common.node.BlockBundle
import fund.cyber.pump.common.node.BlockBundleMapper
import fund.cyber.pump.common.node.BlockBundleEventGenerator
import fund.cyber.pump.common.node.FlowableBlockchainInterface
import fund.cyber.search.configuration.STACK_CACHE_SIZE
import fund.cyber.search.configuration.STACK_CACHE_SIZE_DEFAULT
Expand Down Expand Up @@ -39,7 +39,7 @@ class ChainPump<T : BlockBundle>(
private val startBlockNumber: Long,
@Value("\${$STACK_CACHE_SIZE:$STACK_CACHE_SIZE_DEFAULT}")
private val stackCacheSize: Int,
private val blockBundleMapper: BlockBundleMapper<T>,
private val blockBundleEventGenerator: BlockBundleEventGenerator<T>,
private val applicationContext: ConfigurableApplicationContext
) {

Expand All @@ -64,7 +64,7 @@ class ChainPump<T : BlockBundle>(
val history = initializeStackCache()

flowableBlockchainInterface.subscribeBlocks(startBlockNumber)
.flatMap { blockBundle -> blockBundleMapper.map(blockBundle, history).toFlowable() }
.flatMap { blockBundle -> blockBundleEventGenerator.generate(blockBundle, history).toFlowable() }
.buffer(BLOCK_BUFFER_TIMESPAN, TimeUnit.SECONDS)
.blockingSubscribe(
{ blockBundleEvents ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ import io.micrometer.core.instrument.MeterRegistry
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component

interface BlockBundleMapper<T : BlockBundle> {
fun map(blockBundle: T, history: StackCache<T>): List<Pair<PumpEvent, T>>
interface BlockBundleEventGenerator<T : BlockBundle> {
fun generate(blockBundle: T, history: StackCache<T>): List<Pair<PumpEvent, T>>
}

private val log = LoggerFactory.getLogger(CommonBlockBundleMapper::class.java)!!
private val log = LoggerFactory.getLogger(CommonBlockBundleEventGenerator::class.java)!!

@Component
class CommonBlockBundleMapper<T : BlockBundle>(
class CommonBlockBundleEventGenerator<T : BlockBundle>(
private val blockchainInterface: FlowableBlockchainInterface<T>,
monitoring: MeterRegistry
) : BlockBundleMapper<T> {
) : BlockBundleEventGenerator<T> {

private val chainReorganizationMonitor = monitoring.counter("pump_chain_reorganization_counter")

override fun map(blockBundle: T, history: StackCache<T>): List<Pair<PumpEvent, T>> {
override fun generate(blockBundle: T, history: StackCache<T>): List<Pair<PumpEvent, T>> {
val exHash = history.peek()?.hash ?: ""
if (exHash.isNotEmpty() && blockBundle.parentHash != exHash) {
log.info("Chain reorganization occurred. Processing involved bundles")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ data class TestBlockBundle(
override val blockSize: Int
) : BlockBundle

class BlockBundleMapperTest {
class BlockBundleEventGeneratorTest {

@Test
fun normalFlowTest() {
Expand All @@ -29,10 +29,10 @@ class BlockBundleMapperTest {
history.push(blockB)
history.push(blockC)

val blockBundleMapper = CommonBlockBundleMapper<TestBlockBundle>(mock(), SimpleMeterRegistry())
val blockBundleMapper = CommonBlockBundleEventGenerator<TestBlockBundle>(mock(), SimpleMeterRegistry())


val result = blockBundleMapper.map(blockD, history)
val result = blockBundleMapper.generate(blockD, history)

Assertions.assertThat(result.size).isEqualTo(1)
Assertions.assertThat(result).containsExactly(
Expand Down Expand Up @@ -68,10 +68,10 @@ class BlockBundleMapperTest {
on { blockBundleByNumber(5) }.thenReturn(blockH)
}

val blockBundleMapper = CommonBlockBundleMapper(blockchainInterface, SimpleMeterRegistry())
val blockBundleMapper = CommonBlockBundleEventGenerator(blockchainInterface, SimpleMeterRegistry())


val result = blockBundleMapper.map(blockK, history)
val result = blockBundleMapper.generate(blockK, history)

Assertions.assertThat(result.size).isEqualTo(7)
Assertions.assertThat(result.filter { pair -> pair.first == PumpEvent.DROPPED_BLOCK }.size).isEqualTo(3)
Expand Down

0 comments on commit a26e01c

Please sign in to comment.