Skip to content

Commit

Permalink
Merge branch 'refs/heads/main' into donovan/eigen-blueprint-test
Browse files Browse the repository at this point in the history
# Conflicts:
#	blueprint-test-utils/Cargo.toml
#	blueprints/incredible-squaring/src/eigenlayer.rs
  • Loading branch information
Tjemmmic committed Sep 24, 2024
2 parents a25d108 + c41f864 commit 0bf0dbb
Show file tree
Hide file tree
Showing 48 changed files with 391 additions and 611 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 34 additions & 58 deletions blueprint-manager/src/executor/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use gadget_io::GadgetConfig;
use gadget_sdk::clients::tangle::runtime::{TangleConfig, TangleEvent};
use gadget_sdk::clients::tangle::services::{RpcServicesWithBlueprint, ServicesClient};
use gadget_sdk::config::Protocol;
use gadget_sdk::logger::Logger;
use gadget_sdk::{error, info, trace, warn};
use std::fmt::Debug;
use std::sync::atomic::Ordering;
use tangle_subxt::subxt::utils::AccountId32;
Expand Down Expand Up @@ -40,19 +40,17 @@ pub async fn handle_services<'a>(
gadget_config: &GadgetConfig,
blueprint_manager_opts: &BlueprintManagerConfig,
active_gadgets: &mut ActiveGadgets,
logger: &Logger,
) -> color_eyre::Result<()> {
for blueprint in blueprints {
if let Err(err) = crate::sources::handle(
blueprint,
gadget_config,
blueprint_manager_opts,
active_gadgets,
logger,
)
.await
{
logger.error(err)
error!("{err}");
}
}

Expand All @@ -68,7 +66,6 @@ pub struct EventPollResult {

pub(crate) async fn check_blueprint_events(
event: &TangleEvent,
logger: &Logger,
active_gadgets: &mut ActiveGadgets,
account_id: &AccountId32,
) -> EventPollResult {
Expand All @@ -86,11 +83,11 @@ pub(crate) async fn check_blueprint_events(
Ok(evt) => {
if &evt.operator == account_id {
result.blueprint_registrations.push(evt.blueprint_id);
logger.info(format!("Pre-registered event: {evt:?}"));
info!("Pre-registered event: {evt:?}");
}
}
Err(err) => {
logger.warn(format!("Error handling pre-registered event: {err:?}"));
warn!("Error handling pre-registered event: {err:?}");
}
}
}
Expand All @@ -99,11 +96,11 @@ pub(crate) async fn check_blueprint_events(
for evt in registered_events {
match evt {
Ok(evt) => {
logger.info(format!("Registered event: {evt:?}"));
info!("Registered event: {evt:?}");
result.needs_update = true;
}
Err(err) => {
logger.warn(format!("Error handling registered event: {err:?}"));
warn!("Error handling registered event: {err:?}");
}
}
}
Expand All @@ -112,19 +109,16 @@ pub(crate) async fn check_blueprint_events(
for evt in unregistered_events {
match evt {
Ok(evt) => {
logger.info(format!("Unregistered event: {evt:?}"));
info!("Unregistered event: {evt:?}");
if &evt.operator == account_id && active_gadgets.remove(&evt.blueprint_id).is_some()
{
logger.info(format!(
"Removed services for blueprint_id: {}",
evt.blueprint_id,
));
info!("Removed services for blueprint_id: {}", evt.blueprint_id,);

result.needs_update = true;
}
}
Err(err) => {
logger.warn(format!("Error handling unregistered event: {err:?}"));
warn!("Error handling unregistered event: {err:?}");
}
}
}
Expand All @@ -133,10 +127,10 @@ pub(crate) async fn check_blueprint_events(
for evt in service_initiated_events {
match evt {
Ok(evt) => {
logger.info(format!("Service initiated event: {evt:?}"));
info!("Service initiated event: {evt:?}");
}
Err(err) => {
logger.warn(format!("Error handling service initiated event: {err:?}"));
warn!("Error handling service initiated event: {err:?}");
}
}
}
Expand All @@ -145,10 +139,10 @@ pub(crate) async fn check_blueprint_events(
for evt in job_called_events {
match evt {
Ok(evt) => {
logger.info(format!("Job called event: {evt:?}"));
info!("Job called event: {evt:?}");
}
Err(err) => {
logger.warn(format!("Error handling job called event: {err:?}"));
warn!("Error handling job called event: {err:?}");
}
}
}
Expand All @@ -157,12 +151,10 @@ pub(crate) async fn check_blueprint_events(
for evt in job_result_submitted_events {
match evt {
Ok(evt) => {
logger.info(format!("Job result submitted event: {evt:?}"));
info!("Job result submitted event: {evt:?}");
}
Err(err) => {
logger.warn(format!(
"Error handling job result submitted event: {err:?}"
));
warn!("Error handling job result submitted event: {err:?}");
}
}
}
Expand All @@ -174,16 +166,15 @@ pub(crate) async fn check_blueprint_events(
pub(crate) async fn handle_tangle_event(
event: &TangleEvent,
blueprints: &[RpcServicesWithBlueprint],
logger: &Logger,
gadget_config: &GadgetConfig,
gadget_manager_opts: &BlueprintManagerConfig,
active_gadgets: &mut ActiveGadgets,
poll_result: EventPollResult,
client: &ServicesClient<TangleConfig>,
) -> color_eyre::Result<()> {
logger.trace(format!("Received notification {}", event.number));
info!("Received notification {}", event.number);
const DEFAULT_PROTOCOL: Protocol = Protocol::Tangle;
logger.warn("Using Tangle protocol as default over Eigen. This is a temporary development workaround. You can alter this behavior here");
warn!("Using Tangle protocol as default over Eigen. This is a temporary development workaround. You can alter this behavior here");

let mut registration_blueprints = vec![];
// First, check to see if we need to register any new services invoked by the PreRegistration event
Expand Down Expand Up @@ -232,7 +223,6 @@ pub(crate) async fn handle_tangle_event(
let fetcher = GithubBinaryFetcher {
fetcher: gh.clone(),
blueprint_id: blueprint.blueprint_id,
logger,
gadget_name: blueprint.name.clone(),
};

Expand All @@ -242,14 +232,13 @@ pub(crate) async fn handle_tangle_event(
GadgetSourceFetcher::Testing(test) => {
// TODO: demote to TRACE once proven to work
if !gadget_manager_opts.test_mode {
logger.warn("Ignoring testing fetcher as we are not in test mode");
warn!("Ignoring testing fetcher as we are not in test mode");
continue;
}

let fetcher = crate::sources::testing::TestSourceFetcher {
fetcher: test.clone(),
blueprint_id: blueprint.blueprint_id,
logger,
gadget_name: blueprint.name.clone(),
};

Expand All @@ -258,7 +247,7 @@ pub(crate) async fn handle_tangle_event(
}

_ => {
logger.warn("Blueprint does not contain a supported fetcher");
warn!("Blueprint does not contain a supported fetcher");
continue;
}
}
Expand All @@ -268,19 +257,16 @@ pub(crate) async fn handle_tangle_event(

// Ensure that we have at least one fetcher
if fetcher_candidates.is_empty() {
logger.warn(format!(
"No fetchers found for blueprint: {}",
blueprint.name,
));
warn!("No fetchers found for blueprint: {}", blueprint.name,);
continue;
}

// Ensure that we have a test fetcher if we are in test mode
if gadget_manager_opts.test_mode && test_fetcher_idx.is_none() {
logger.warn(format!(
warn!(
"No testing fetcher found for blueprint `{}` despite operating in TEST MODE",
blueprint.name,
));
);
continue;
}

Expand All @@ -292,10 +278,10 @@ pub(crate) async fn handle_tangle_event(

// Ensure there is only a single candidate fetcher
if fetcher_candidates.len() != 1 {
logger.warn(format!(
warn!(
"Multiple fetchers found for blueprint: {}. Invalidating blueprint",
blueprint.name,
));
);
continue;
}

Expand All @@ -306,26 +292,24 @@ pub(crate) async fn handle_tangle_event(

verified_blueprints.push(verified_blueprint);
} else {
logger
.warn("Blueprint does not contain a native gadget and thus currently unsupported");
warn!("Blueprint does not contain a native gadget and thus currently unsupported");
}
}

logger.trace(format!(
trace!(
"OnChain Verified Blueprints: {:?}",
verified_blueprints
.iter()
.map(|r| format!("{r:?}"))
.collect::<Vec<_>>()
));
);

// Step 3: Check to see if we need to start any new services
handle_services(
&verified_blueprints,
gadget_config,
gadget_manager_opts,
active_gadgets,
logger,
)
.await?;

Expand All @@ -335,9 +319,9 @@ pub(crate) async fn handle_tangle_event(
// Loop through every (blueprint_id, service_id) running. See if the service is still on-chain. If not, kill it and add it to to_remove
for (blueprint_id, process_handles) in &mut *active_gadgets {
for service_id in process_handles.keys() {
logger.info(format!(
info!(
"Checking service for on-chain termination: bid={blueprint_id}//sid={service_id}"
));
);

// Since the below "verified blueprints" were freshly obtained from an on-chain source,
// we compare all these fresh values to see if we're running a service locally that is no longer on-chain
Expand All @@ -346,9 +330,7 @@ pub(crate) async fn handle_tangle_event(
// Safe assertion since we know there is at least one fetcher. All fetchers should have the same blueprint id
let fetcher = &verified_blueprints.fetcher;
if fetcher.blueprint_id() == *blueprint_id && !services.contains(service_id) {
logger.warn(format!(
"Killing service that is no longer on-chain: bid={blueprint_id}//sid={service_id}",
));
warn!("Killing service that is no longer on-chain: bid={blueprint_id}//sid={service_id}");
to_remove.push((*blueprint_id, *service_id));
}
}
Expand All @@ -362,28 +344,22 @@ pub(crate) async fn handle_tangle_event(
&& !process_handle.0.load(Ordering::Relaxed)
{
// By removing any killed processes, we will auto-restart them on the next finality notification if required
logger.warn("Killing service that has died to allow for auto-restart");
warn!("Killing service that has died to allow for auto-restart");
to_remove.push((*blueprint_id, *service_id));
}
}
}

for (blueprint_id, service_id) in to_remove {
logger.warn(format!(
"Removing service that is no longer active on-chain or killed: bid={blueprint_id}//sid={service_id}",
));
warn!("Removing service that is no longer active on-chain or killed: bid={blueprint_id}//sid={service_id}");
let mut should_delete_blueprint = false;
if let Some(gadgets) = active_gadgets.get_mut(&blueprint_id) {
if let Some((_, mut process_handle)) = gadgets.remove(&service_id) {
if let Some(abort_handle) = process_handle.take() {
if abort_handle.send(()).is_err() {
logger.error(format!(
"Failed to send abort signal to service: bid={blueprint_id}//sid={service_id}",
));
error!("Failed to send abort signal to service: bid={blueprint_id}//sid={service_id}");
} else {
logger.warn(format!(
"Sent abort signal to service: bid={blueprint_id}//sid={service_id}",
));
warn!("Sent abort signal to service: bid={blueprint_id}//sid={service_id}");
}
}
}
Expand Down
Loading

0 comments on commit 0bf0dbb

Please sign in to comment.