Skip to content

Commit

Permalink
Add v2 endpoint to support old and new version event producers (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Mar 15, 2024
1 parent d364d51 commit f97d15e
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 8 deletions.
11 changes: 11 additions & 0 deletions rs/canister/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ pub struct IdempotentEvent {
pub payload: Vec<u8>,
}

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub struct IdempotentEventPrevious {
pub idempotency_key: u128,
pub name: String,
pub timestamp: TimestampMillis,
pub user: Option<String>,
pub source: Option<String>,
#[serde(with = "serde_bytes")]
pub payload: Vec<u8>,
}

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub struct IndexedEvent {
pub index: u64,
Expand Down
2 changes: 2 additions & 0 deletions rs/canister/api/src/updates/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod push_events;
mod push_events_v2;

pub use push_events::*;
pub use push_events_v2::*;
6 changes: 3 additions & 3 deletions rs/canister/api/src/updates/push_events.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::IdempotentEvent;
use crate::IdempotentEventPrevious;
use candid::{CandidType, Deserialize};
use serde::Serialize;

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub struct PushEventsArgs {
pub events: Vec<IdempotentEvent>,
pub struct PushEventsArgsPrevious {
pub events: Vec<IdempotentEventPrevious>,
}
8 changes: 8 additions & 0 deletions rs/canister/api/src/updates/push_events_v2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use crate::IdempotentEvent;
use candid::{CandidType, Deserialize};
use serde::Serialize;

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub struct PushEventsArgs {
pub events: Vec<IdempotentEvent>,
}
25 changes: 23 additions & 2 deletions rs/canister/impl/src/updates/push_events.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,31 @@
use crate::guards::caller_can_push_events;
use crate::{env, state};
use event_store_canister::PushEventsArgs;
use event_store_canister::{Anonymizable, IdempotentEvent, PushEventsArgs, PushEventsArgsPrevious};
use ic_cdk::update;

#[update(guard = "caller_can_push_events")]
fn push_events(args: PushEventsArgs) {
fn push_events(args: PushEventsArgsPrevious) {
let now = env::time();

state::mutate(|s| {
for event in args.events {
s.push_event(
IdempotentEvent {
idempotency_key: event.idempotency_key,
name: event.name,
timestamp: event.timestamp,
user: event.user.map(Anonymizable::Public),
source: event.source.map(Anonymizable::Public),
payload: event.payload,
},
now,
);
}
});
}

#[update(guard = "caller_can_push_events")]
fn push_events_v2(args: PushEventsArgs) {
let now = env::time();

state::mutate(|s| {
Expand Down
2 changes: 1 addition & 1 deletion rs/integration_tests/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub fn push_events(
canister_id: Principal,
args: &PushEventsArgs,
) {
execute_update_no_response(env, sender, canister_id, "push_events", args)
execute_update_no_response(env, sender, canister_id, "push_events_v2", args)
}

fn execute_query<P: CandidType, R: CandidType + DeserializeOwned>(
Expand Down
2 changes: 1 addition & 1 deletion rs/producer/agent_runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn flush_async<F: FnOnce(FlushOutcome) + Send + 'static>(
on_complete: F,
) {
if agent
.update(&canister_id, "push_events".to_string())
.update(&canister_id, "push_events_v2".to_string())
.with_arg(candid::encode_one(PushEventsArgs { events }).unwrap())
.call_and_wait()
.await
Expand Down
2 changes: 1 addition & 1 deletion rs/producer/cdk_runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn flush_async<F: FnOnce(FlushOutcome)>(
) {
let events_len = events.len();
if let Err(error) =
ic_cdk::call::<_, ()>(canister_id, "push_events", (PushEventsArgs { events },)).await
ic_cdk::call::<_, ()>(canister_id, "push_events_v2", (PushEventsArgs { events },)).await
{
on_complete(FLUSH_OUTCOME_FAILED_SHOULD_RETRY);
error!(%canister_id, events = events_len, ?error, "Failed to call 'push_events'");
Expand Down

0 comments on commit f97d15e

Please sign in to comment.