Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ergo bots tracker #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "events-streaming"
version = "0.1.0"
edition = "2021"
rust-version = "1.67.1"
rust-version = "1.71.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
FROM rust:1.67.1
FROM rust:1.73.0
WORKDIR /usr/src/events-streaming
COPY . .
RUN apt-get update && apt-get install make clang pkg-config libssl-dev glibc-source gcc libstdc++6 -y
RUN cargo install --path .

CMD ["events-streaming", "--config-yaml-path=/etc/config.yml"]
ARG URL

CMD events-streaming --config-yaml-path=/usr/src/events-streaming/conf/config.yml --node-addr=${URL}
10 changes: 4 additions & 6 deletions conf/config.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
node_addr: http://213.239.193.208:9053
http_client_timeout_duration_secs: 5
chain_sync_starting_height: 912500
chain_sync_starting_height: 1143232
log4rs_yaml_path: conf/log4rs.yaml
chain_cache_db_path: ./tmp/chain
mempool_cache_db_path: ./tmp/mempool
kafka_address: "kafka:9092"
blocks_topic: "blocks_topic"
tx_topic: "tx_topic"
mempool_topic: "mempool_topic"
mempool_sync_interval: 1
tx_topic: "ledger-tx-topic"
mempool_topic: "mempool-tx-topic"
mempool_sync_interval: 500
50 changes: 12 additions & 38 deletions src/event_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,17 @@ use crate::models::tx_event::TxEvent;
use async_std::task::spawn_blocking;

pub fn block_event_source<S>(
upstream: S,
producer: Producer,
topic: String,
) -> impl Stream<Item = ChainUpgrade>
where
S: Stream<Item = ChainUpgrade>,
upstream: S
) -> impl Stream<Item=ChainUpgrade>
where
S: Stream<Item=ChainUpgrade>,
{
let topic = Arc::new(tokio::sync::Mutex::new(topic));
let producer = Arc::new(std::sync::Mutex::new(producer));
upstream.then(move |ev| {
let topic = topic.clone();
let producer = producer.clone();
let ev_clone = ev.clone();
async move {
let block_event = BlockEvent::from(ev_clone);
let block_id: String = match block_event.clone() {
BlockEvent::BlockApply { id, .. } | BlockEvent::BlockUnapply { id, .. } => id,
};
let value = serde_json::to_string(&block_event).unwrap();
let topic = topic.clone().lock().await.clone();
spawn_blocking(move || {
let rec: &Record<String, String> =
&Record::from_key_value(topic.as_str(), block_id.clone(), value.clone());
info!("Block value is: ${:?}", value.clone());
info!("Got new block. Key: ${:?}", block_id);
producer.lock().unwrap().send(rec).unwrap();
info!("New block processed by kafka. Key: ${:?}", block_id);
})
.await;
ev
}
})
upstream
}

pub fn tx_event_source<S>(upstream: S) -> impl Stream<Item = TxEvent>
where
S: Stream<Item = ChainUpgrade>,
pub fn tx_event_source<S>(upstream: S) -> impl Stream<Item=TxEvent>
where
S: Stream<Item=ChainUpgrade>,
{
upstream.flat_map(|u| stream::iter(process_upgrade(u)))
}
Expand Down Expand Up @@ -81,9 +55,9 @@ pub fn mempool_event_source<S>(
upstream: S,
producer: Producer,
topic: String,
) -> impl Stream<Item = ()>
where
S: Stream<Item = MempoolUpdate>,
) -> impl Stream<Item=()>
where
S: Stream<Item=MempoolUpdate>,
{
let topic = Arc::new(tokio::sync::Mutex::new(topic));
let producer = Arc::new(std::sync::Mutex::new(producer));
Expand All @@ -108,7 +82,7 @@ where
producer.lock().unwrap().send(rec).unwrap();
info!("New mempool event processed by kafka. Key: ${:?}", tx_id);
})
.await;
.await;
}
}
})
Expand Down
31 changes: 14 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use isahc::{prelude::*, HttpClient};
use serde::Deserialize;
use std::pin::Pin;
use std::sync::{Arc, Once};
use std::str::FromStr;
use std::time::Duration;

use futures::StreamExt;
use kafka::producer::{Producer, RequiredAcks};

Expand All @@ -30,6 +30,7 @@ use spectrum_offchain::event_sink::process_events;
#[tokio::main]
async fn main() {
let args = AppArgs::parse();
println!("args {:?}", args.node_addr);
let raw_config =
std::fs::read_to_string(args.config_yaml_path).expect("Cannot load configuration file");
let config: AppConfig = serde_yaml::from_str(&raw_config).expect("Invalid configuration file");
Expand All @@ -40,13 +41,16 @@ async fn main() {
log4rs::init_file(config.log4rs_yaml_path, Default::default()).unwrap();
}
let client = HttpClient::builder()
.timeout(std::time::Duration::from_secs(
.timeout(Duration::from_secs(
config.http_client_timeout_duration_secs as u64,
))
.build()
.unwrap();

let node = ErgoNodeHttpClient::new(client, config.node_addr.clone());

let node_addr = Url::from_str(args.node_addr.clone().as_str()).unwrap();

let node = ErgoNodeHttpClient::new(client, node_addr);
let cache = ChainCacheRocksDB::new(RocksConfig {
db_path: config.chain_cache_db_path.into(),
});
Expand All @@ -57,16 +61,11 @@ async fn main() {
cache,
Some(&SIGNAL_TIP_REACHED),
)
.await;
.await;
let cache_mempool = ChainCacheRocksDB::new(RocksConfig {
db_path: config.mempool_cache_db_path.into(),
});

let producer1 = Producer::from_hosts(vec![config.kafka_address.to_owned()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create()
.unwrap();
let producer2 = Producer::from_hosts(vec![config.kafka_address.to_owned()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
Expand All @@ -82,19 +81,17 @@ async fn main() {

let mempool_sync = mempool_sync_stream(
MempoolSyncConf {
sync_interval: Duration::from_secs(config.mempool_sync_interval),
sync_interval: Duration::from_millis(config.mempool_sync_interval),
},
mempool_chain_sync,
&node,
)
.await;
.await;

let mempool_source =
mempool_event_source(mempool_sync, producer3, config.mempool_topic.to_string());
let event_source = tx_event_source(block_event_source(
chain_sync_stream(chain_sync),
producer1,
config.blocks_topic.to_string(),
chain_sync_stream(chain_sync)
));
let handler = ProxyEvents::new(
Arc::new(std::sync::Mutex::new(producer2)),
Expand All @@ -114,14 +111,12 @@ async fn main() {

#[derive(Deserialize)]
struct AppConfig<'a> {
node_addr: Url,
http_client_timeout_duration_secs: u32,
chain_sync_starting_height: u32,
log4rs_yaml_path: &'a str,
chain_cache_db_path: &'a str,
mempool_cache_db_path: &'a str,
kafka_address: &'a str,
blocks_topic: &'a str,
tx_topic: &'a str,
mempool_topic: &'a str,
mempool_sync_interval: u64,
Expand All @@ -138,8 +133,10 @@ struct AppArgs {
/// Optional path to the log4rs YAML configuration file. NOTE: overrides path specified in config YAML file.
#[arg(long, short)]
log4rs_path: Option<String>,
#[arg(long, short)]
node_addr: String,
}

pub fn boxed<'a, T>(s: impl Stream<Item = T> + 'a) -> Pin<Box<dyn Stream<Item = T> + 'a>> {
pub fn boxed<'a, T>(s: impl Stream<Item=T> + 'a) -> Pin<Box<dyn Stream<Item=T> + 'a>> {
Box::pin(s)
}