diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 049c5470..80b19f61 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -149,14 +149,6 @@ jobs: username: "runner" env: "{}" - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - with: - install: true - - - name: Verify Docker installation - run: docker ps - # Rest of the steps remain the same - name: Run anvil in background run: anvil --version && anvil & diff --git a/.github/workflows/docker-desktop/action.yml b/.github/workflows/docker-desktop/action.yml index ea8fcd79..030c7625 100644 --- a/.github/workflows/docker-desktop/action.yml +++ b/.github/workflows/docker-desktop/action.yml @@ -150,6 +150,9 @@ runs: - name: Wait for Docker to be up and running shell: bash run: | + if [[ "$RUNNER_OS" == "macOS" ]]; then + export DOCKER_HOST="unix://${HOME}/.docker/desktop/docker.sock" + fi until docker ps; do echo "docker not ready, sleep 10 s and try again"; sleep 10; done echo "Docker started and ready" docker version \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 033b6a0b..6ad9d0ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4180,7 +4180,7 @@ dependencies = [ ] [[package]] -name = "example-blueprint" +name = "examples" version = "0.1.1" dependencies = [ "blueprint-metadata", diff --git a/Cargo.toml b/Cargo.toml index 5ed84c50..3d21a103 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ members = [ "blueprints/incredible-squaring", "blueprints/incredible-squaring-eigenlayer", "blueprints/incredible-squaring-symbiotic", - "blueprints/example-blueprint", + "blueprints/examples", "cli", "gadget-io", "blueprint-test-utils", @@ -51,9 +51,9 @@ blueprint-test-utils = { path = "./blueprint-test-utils" } gadget-sdk = { path = "./sdk", default-features = false, version = "0.3.0" } incredible-squaring-blueprint = { path = "./blueprints/incredible-squaring", default-features = false, version = "0.1.1" } -incredible-squaring-blueprint-eigenlayer = { path = "./blueprints/incredible-squaring-eigenlayer", default-features = false, version = "0.1.1" } +incredible-squaring-blueprint-eigenlayer = { path = s"./blueprints/incredible-squaring-eigenlayer", default-features = false, version = "0.1.1" } incredible-squaring-blueprint-symbiotic = { path = "./blueprints/incredible-squaring-symbiotic", default-features = false, version = "0.1.1" } -example-blueprint = { path = "./blueprints/example-blueprint", default-features = false, version = "0.1.1" } +examples = { path = "./blueprints/examples", default-features = false, version = "0.1.1" } gadget-blueprint-proc-macro = { path = "./macros/blueprint-proc-macro", default-features = false, version = "0.3.0" } gadget-blueprint-proc-macro-core = { path = "./macros/blueprint-proc-macro-core", default-features = false, version = "0.1.5" } gadget-context-derive = { path = "./macros/context-derive", default-features = false, version = "0.2.0" } diff --git a/blueprints/example-blueprint/Cargo.toml b/blueprints/examples/Cargo.toml similarity index 95% rename from blueprints/example-blueprint/Cargo.toml rename to blueprints/examples/Cargo.toml index 43aa7af7..31faf49e 100644 --- a/blueprints/example-blueprint/Cargo.toml +++ b/blueprints/examples/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "example-blueprint" +name = "examples" version = "0.1.1" description = "A variety of example blueprint jobs, event listeners, contexts, and more." authors.workspace = true diff --git a/blueprints/example-blueprint/build.rs b/blueprints/examples/build.rs similarity index 100% rename from blueprints/example-blueprint/build.rs rename to blueprints/examples/build.rs diff --git a/blueprints/example-blueprint/src/examples/mod.rs b/blueprints/examples/src/examples/mod.rs similarity index 62% rename from blueprints/example-blueprint/src/examples/mod.rs rename to blueprints/examples/src/examples/mod.rs index 54d9a0a3..c3b6b4a0 100644 --- a/blueprints/example-blueprint/src/examples/mod.rs +++ b/blueprints/examples/src/examples/mod.rs @@ -1,2 +1,3 @@ pub mod periodic_web_poller; pub mod raw_tangle_events; +pub mod sequential_event_listener; \ No newline at end of file diff --git a/blueprints/example-blueprint/src/examples/periodic_web_poller.rs b/blueprints/examples/src/examples/periodic_web_poller.rs similarity index 100% rename from blueprints/example-blueprint/src/examples/periodic_web_poller.rs rename to blueprints/examples/src/examples/periodic_web_poller.rs diff --git a/blueprints/example-blueprint/src/examples/raw_tangle_events.rs b/blueprints/examples/src/examples/raw_tangle_events.rs similarity index 100% rename from blueprints/example-blueprint/src/examples/raw_tangle_events.rs rename to blueprints/examples/src/examples/raw_tangle_events.rs diff --git a/blueprints/examples/src/examples/sequential_event_listener.rs b/blueprints/examples/src/examples/sequential_event_listener.rs new file mode 100644 index 00000000..234bd685 --- /dev/null +++ b/blueprints/examples/src/examples/sequential_event_listener.rs @@ -0,0 +1,247 @@ +use crate::event_listener::EventListener; +use crate::store::LocalDatabase; +use crate::{error, Error}; +use alloy_contract::ContractInstance; +use alloy_contract::Event; +use alloy_provider::Provider; +use alloy_rpc_types::{BlockNumberOrTag, Filter, Log}; +use alloy_sol_types::SolEvent; +use std::collections::{HashMap, VecDeque}; +use std::marker::PhantomData; +use std::time::Duration; +use uuid::Uuid; + +const EXPIRY_BLOCKS: u64 = 1000; + +/// Trait for correlating sequential events +pub trait EventCorrelation { + /// Returns true if the two events are correlated in sequence + fn are_correlated(&self, first: &E1, second: &E2) -> bool; +} + +/// Macro to create a sequential event listener with N events +#[macro_export] +macro_rules! sequential_event_listener { + ($name:ident, $($event:ident),+) => { + pub struct $name<$($event),+> + where + $($event: SolEvent + Send + Sync + 'static),+ + { + instance: AlloyContractInstance, + chain_id: u64, + local_db: LocalDatabase, + should_cooldown: bool, + pending_sequences: HashMap>, + completed_sequences: VecDeque<($(($event, Log)),+)>, + correlators: Vec + Send + Sync>>, + _phantom: PhantomData<($($event),+)>, + } + + struct EventSequence<$($event),+> { + $( + $event: Option<($event, Log)>, + )+ + sequence_id: String, + last_update: u64, + } + + impl<$($event),+> EventSequence<$($event),+> { + fn new(sequence_id: String) -> Self { + Self { + $( + $event: None, + )+ + sequence_id, + last_update: 0, + } + } + + fn is_complete(&self) -> bool { + $(self.$event.is_some())&&+ + } + } + + #[async_trait::async_trait] + impl<$($event),+> EventListener<($(($event, Log)),+), AlloyContractInstance> + for $name<$($event),+> + where + $($event: SolEvent + Send + Sync + 'static),+ + { + async fn new(context: &AlloyContractInstance) -> Result + where + Self: Sized, + { + let provider = context.provider().root(); + let chain_id = provider + .get_chain_id() + .await + .map_err(|err| Error::Client(format!("Failed to get chain ID: {}", err)))?; + + let local_db = LocalDatabase::open(format!("./db/{}", Uuid::new_v4())); + + Ok(Self { + instance: context.clone(), + chain_id, + local_db, + should_cooldown: false, + pending_sequences: HashMap::new(), + completed_sequences: VecDeque::new(), + correlators: Vec::new(), + _phantom: PhantomData, + }) + } + + async fn next_event(&mut self) -> Option<($(($event, Log)),+)> { + if let Some(sequence) = self.completed_sequences.pop_front() { + return Some(sequence); + } + + if self.should_cooldown { + tokio::time::sleep(Duration::from_millis(5000)).await; + self.should_cooldown = false; + } + + let contract = &self.instance; + let step = 100; + let target_block_number: u64 = contract + .provider() + .get_block_number() + .await + .unwrap_or_default(); + + let block = self + .local_db + .get(&format!("LAST_BLOCK_NUMBER_{}", contract.address())) + .unwrap_or(0); + + let should_cooldown = block >= target_block_number; + if should_cooldown { + self.should_cooldown = true; + return self.next_event().await; + } + + let dest_block = core::cmp::min(block + step, target_block_number); + + // Query all event types + let mut all_events = Vec::new(); + $( + all_events.extend(self.query_events::<$event>(block + 1, dest_block).await); + )+ + + // Sort events by block number and index + all_events.sort_by(|a, b| { + let block_cmp = a.block_number.cmp(&b.block_number); + if block_cmp == std::cmp::Ordering::Equal { + a.transaction_index.cmp(&b.transaction_index) + } else { + block_cmp + } + }); + + // Process events and update sequences + for event in all_events { + self.process_event(event); + } + + // Update block tracking + self.local_db.set( + &format!("LAST_BLOCK_NUMBER_{}", contract.address()), + dest_block, + ); + + self.local_db.set( + &format!("TARGET_BLOCK_{}", contract.address()), + target_block_number, + ); + + // Clean up expired sequences + self.cleanup_expired_sequences(target_block_number); + + self.completed_sequences.pop_front() + } + } + + impl<$($event),+> $name<$($event),+> + where + $($event: SolEvent + Send + Sync + 'static),+ + { + pub fn with_correlators(mut self, correlators: Vec + Send + Sync>>) -> Self { + self.correlators = correlators; + self + } + + async fn query_events(&self, from_block: u64, to_block: u64) -> Vec { + let events_filter = Event::new(self.instance.provider(), Filter::new()) + .address(*self.instance.address()) + .from_block(BlockNumberOrTag::Number(from_block)) + .to_block(BlockNumberOrTag::Number(to_block)) + .event_signature(E::SIGNATURE_HASH); + + match events_filter.query().await { + Ok(events) => events, + Err(e) => { + error!(?e, %self.chain_id, "Error querying events"); + Vec::new() + } + } + } + + fn process_event(&mut self, log: Log) { + // Try to decode the event into each possible type + $( + if let Ok(event) = $event::decode_log(&log) { + // Check existing sequences for correlation + for sequence in self.pending_sequences.values_mut() { + if self.events_correlate(&sequence, &event) { + sequence.$event = Some((event.clone(), log.clone())); + sequence.last_update = log.block_number.unwrap_or_default(); + + if sequence.is_complete() { + if let Some(seq) = self.pending_sequences.remove(&sequence.sequence_id) { + let completed = ( + $( + seq.$event.unwrap(), + )+ + ); + self.completed_sequences.push_back(completed); + } + } + return; + } + } + + // If no correlation found, create new sequence + let sequence_id = Uuid::new_v4().to_string(); + let mut new_sequence = EventSequence::new(sequence_id.clone()); + new_sequence.$event = Some((event, log.clone())); + new_sequence.last_update = log.block_number.unwrap_or_default(); + self.pending_sequences.insert(sequence_id, new_sequence); + } + )+ + } + + fn events_correlate(&self, sequence: &EventSequence<$($event),+>, new_event: &E2) -> bool { + for correlator in &self.correlators { + // Check correlation between consecutive events in the sequence + if let Some((prev_event, _)) = &sequence.E1 { + if correlator.are_correlated(prev_event, new_event) { + return true; + } + } + } + false + } + + fn cleanup_expired_sequences(&mut self, current_block: u64) { + self.pending_sequences.retain(|_, seq| { + current_block - seq.last_update < EXPIRY_BLOCKS + }); + } + } + }; +} + +// Example usage: +// sequential_event_listener!(ThreeEventListener, Event1, Event2, Event3); +// sequential_event_listener!(TwoEventListener, Event1, Event2); +// sequential_event_listener!(FourEventListener, Event1, Event2, Event3, Event4); diff --git a/blueprints/example-blueprint/src/lib.rs b/blueprints/examples/src/lib.rs similarity index 100% rename from blueprints/example-blueprint/src/lib.rs rename to blueprints/examples/src/lib.rs diff --git a/blueprints/example-blueprint/src/main.rs b/blueprints/examples/src/main.rs similarity index 100% rename from blueprints/example-blueprint/src/main.rs rename to blueprints/examples/src/main.rs