Skip to content

Commit

Permalink
Workflow replayer support
Browse files Browse the repository at this point in the history
Fixes #187
  • Loading branch information
cretz committed Jan 23, 2025
1 parent 17cb2b5 commit c7b1789
Show file tree
Hide file tree
Showing 21 changed files with 976 additions and 154 deletions.
44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ until the SDK is marked stable.
- [Activity Worker Shutdown](#activity-worker-shutdown)
- [Activity Concurrency and Executors](#activity-concurrency-and-executors)
- [Activity Testing](#activity-testing)
- [Ractors](#ractors)
- [Platform Support](#platform-support)
- [Development](#development)
- [Build](#build)
Expand Down Expand Up @@ -853,7 +854,48 @@ the mock activity class to make it appear as the real name.

#### Workflow Replay

TODO: Workflow replayer not yet implemented
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
assuming the `history_json` parameter below is given a JSON string of history exported from the CLI or web UI, the
following function will replay it:

```ruby
def replay_from_json(history_json)
replayer = Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow])
replayer.replay_workflow(Temporalio::WorkflowHistory.from_history_json(history_json))
end
```

If there is a non-determinism, this will raise an exception by default.

Workflow history can be loaded from more than just JSON. It can be fetched individually from a workflow handle, or even
in a list. For example, the following code will check that all workflow histories for a certain workflow type (i.e.
workflow class) are safe with the current workflow code.

```ruby
def check_past_histories(client)
replayer = Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow])
results = replayer.replay_workflows(client.list_workflows("WorkflowType = 'MyWorkflow'").map do |desc|
client.workflow_handle(desc.id, run_id: desc.run_id).fetch_history
end)
results.each { |res| raise res.replay_failure if res.replay_failure }
end
```

But this only raises at the end because by default `replay_workflows` does not raise on failure like `replay_workflow`
does. The `raise_on_replay_failure: true` parameter could be set, or the replay worker can be used to process each one
like so:

```ruby
def check_past_histories(client)
Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow]) do |worker|
client.list_workflows("WorkflowType = 'MyWorkflow'").each do |desc|
worker.replay_workflow(client.workflow_handle(desc.id, run_id: desc.run_id).fetch_history)
end
end
end
```

See the `WorkflowReplayer` API documentation for more details.

### Activities

Expand Down
1 change: 1 addition & 0 deletions temporalio/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions temporalio/ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" }
tokio = "1.26"
tokio-stream = "0.1"
tokio-util = "0.7"
tonic = "0.12"
tracing = "0.1"
Expand Down
181 changes: 120 additions & 61 deletions temporalio/ext/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use crate::{
client::Client,
enter_sync, error, id, new_error,
runtime::{AsyncCommand, RuntimeHandle},
runtime::{AsyncCommand, Runtime, RuntimeHandle},
util::{AsyncCallback, Struct},
ROOT_MOD,
};
Expand All @@ -20,15 +20,19 @@ use magnus::{
};
use prost::Message;
use temporal_sdk_core::{
replay::{HistoryForReplay, ReplayWorkerInput},
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions,
SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfigBuilder,
SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfig, WorkerConfigBuilder,
};
use temporal_sdk_core_api::{
errors::{PollError, WorkflowErrorType},
worker::SlotKind,
};
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
use temporal_sdk_core_protos::temporal::api::history::v1::History;
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::ReceiverStream;

pub fn init(ruby: &Ruby) -> Result<(), Error> {
let class = ruby
Expand All @@ -55,6 +59,11 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
)?;
class.define_method("replace_client", method!(Worker::replace_client, 1))?;
class.define_method("initiate_shutdown", method!(Worker::initiate_shutdown, 0))?;

let inner_class = class.define_class("WorkflowReplayer", class::object())?;
inner_class.define_singleton_method("new", function!(WorkflowReplayer::new, 2))?;
inner_class.define_method("push_history", method!(WorkflowReplayer::push_history, 2))?;

Ok(())
}

Expand Down Expand Up @@ -89,70 +98,13 @@ impl Worker {
let activity = options.member::<bool>(id!("activity"))?;
let workflow = options.member::<bool>(id!("workflow"))?;

// Build config
let config = WorkerConfigBuilder::default()
.namespace(options.member::<String>(id!("namespace"))?)
.task_queue(options.member::<String>(id!("task_queue"))?)
.worker_build_id(options.member::<String>(id!("build_id"))?)
.client_identity_override(options.member::<Option<String>>(id!("identity_override"))?)
.max_cached_workflows(options.member::<usize>(id!("max_cached_workflows"))?)
.max_concurrent_wft_polls(
options.member::<usize>(id!("max_concurrent_workflow_task_polls"))?,
)
.nonsticky_to_sticky_poll_ratio(
options.member::<f32>(id!("nonsticky_to_sticky_poll_ratio"))?,
)
.max_concurrent_at_polls(
options.member::<usize>(id!("max_concurrent_activity_task_polls"))?,
)
.no_remote_activities(options.member::<bool>(id!("no_remote_activities"))?)
.sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64(
options.member(id!("sticky_queue_schedule_to_start_timeout"))?,
))
.max_heartbeat_throttle_interval(Duration::from_secs_f64(
options.member(id!("max_heartbeat_throttle_interval"))?,
))
.default_heartbeat_throttle_interval(Duration::from_secs_f64(
options.member(id!("default_heartbeat_throttle_interval"))?,
))
.max_worker_activities_per_second(
options.member::<Option<f64>>(id!("max_worker_activities_per_second"))?,
)
.max_task_queue_activities_per_second(
options.member::<Option<f64>>(id!("max_task_queue_activities_per_second"))?,
)
.graceful_shutdown_period(Duration::from_secs_f64(
options.member(id!("graceful_shutdown_period"))?,
))
.use_worker_versioning(options.member::<bool>(id!("use_worker_versioning"))?)
.tuner(Arc::new(build_tuner(
options
.child(id!("tuner"))?
.ok_or_else(|| error!("Missing tuner"))?,
)?))
.workflow_failure_errors(
if options.member::<bool>(id!("nondeterminism_as_workflow_fail"))? {
HashSet::from([WorkflowErrorType::Nondeterminism])
} else {
HashSet::new()
},
)
.workflow_types_to_failure_errors(
options
.member::<Vec<String>>(id!("nondeterminism_as_workflow_fail_for_types"))?
.into_iter()
.map(|s| (s, HashSet::from([WorkflowErrorType::Nondeterminism])))
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
)
.build()
.map_err(|err| error!("Invalid worker options: {}", err))?;

let worker = temporal_sdk_core::init_worker(
&client.runtime_handle.core,
config,
build_config(options)?,
client.core.clone().into_inner(),
)
.map_err(|err| error!("Failed creating worker: {}", err))?;

Ok(Worker {
core: RefCell::new(Some(Arc::new(worker))),
runtime_handle: client.runtime_handle.clone(),
Expand Down Expand Up @@ -435,6 +387,113 @@ impl Worker {
}
}

#[derive(DataTypeFunctions, TypedData)]
#[magnus(
class = "Temporalio::Internal::Bridge::Worker::WorkflowReplayer",
free_immediately
)]
pub struct WorkflowReplayer {
tx: Sender<HistoryForReplay>,
runtime_handle: RuntimeHandle,
}

impl WorkflowReplayer {
pub fn new(runtime: &Runtime, options: Struct) -> Result<(Self, Worker), Error> {
enter_sync!(runtime.handle.clone());

let (tx, rx) = channel(1);

let core_worker = temporal_sdk_core::init_replay_worker(ReplayWorkerInput::new(
build_config(options)?,
ReceiverStream::new(rx),
))
.map_err(|err| error!("Failed creating worker: {}", err))?;

Ok((
WorkflowReplayer {
tx,
runtime_handle: runtime.handle.clone(),
},
Worker {
core: RefCell::new(Some(Arc::new(core_worker))),
runtime_handle: runtime.handle.clone(),
activity: false,
workflow: true,
},
))
}

pub fn push_history(&self, workflow_id: String, proto: RString) -> Result<(), Error> {
let history = History::decode(unsafe { proto.as_slice() })
.map_err(|err| error!("Invalid proto: {}", err))?;
let tx = self.tx.clone();
self.runtime_handle.core.tokio_handle().spawn(async move {
// Intentionally ignoring error here
let _ = tx.send(HistoryForReplay::new(history, workflow_id)).await;
});
Ok(())
}
}

fn build_config(options: Struct) -> Result<WorkerConfig, Error> {
WorkerConfigBuilder::default()
.namespace(options.member::<String>(id!("namespace"))?)
.task_queue(options.member::<String>(id!("task_queue"))?)
.worker_build_id(options.member::<String>(id!("build_id"))?)
.client_identity_override(options.member::<Option<String>>(id!("identity_override"))?)
.max_cached_workflows(options.member::<usize>(id!("max_cached_workflows"))?)
.max_concurrent_wft_polls(
options.member::<usize>(id!("max_concurrent_workflow_task_polls"))?,
)
.nonsticky_to_sticky_poll_ratio(
options.member::<f32>(id!("nonsticky_to_sticky_poll_ratio"))?,
)
.max_concurrent_at_polls(
options.member::<usize>(id!("max_concurrent_activity_task_polls"))?,
)
.no_remote_activities(options.member::<bool>(id!("no_remote_activities"))?)
.sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64(
options.member(id!("sticky_queue_schedule_to_start_timeout"))?,
))
.max_heartbeat_throttle_interval(Duration::from_secs_f64(
options.member(id!("max_heartbeat_throttle_interval"))?,
))
.default_heartbeat_throttle_interval(Duration::from_secs_f64(
options.member(id!("default_heartbeat_throttle_interval"))?,
))
.max_worker_activities_per_second(
options.member::<Option<f64>>(id!("max_worker_activities_per_second"))?,
)
.max_task_queue_activities_per_second(
options.member::<Option<f64>>(id!("max_task_queue_activities_per_second"))?,
)
.graceful_shutdown_period(Duration::from_secs_f64(
options.member(id!("graceful_shutdown_period"))?,
))
.use_worker_versioning(options.member::<bool>(id!("use_worker_versioning"))?)
.tuner(Arc::new(build_tuner(
options
.child(id!("tuner"))?
.ok_or_else(|| error!("Missing tuner"))?,
)?))
.workflow_failure_errors(
if options.member::<bool>(id!("nondeterminism_as_workflow_fail"))? {
HashSet::from([WorkflowErrorType::Nondeterminism])
} else {
HashSet::new()
},
)
.workflow_types_to_failure_errors(
options
.member::<Vec<String>>(id!("nondeterminism_as_workflow_fail_for_types"))?
.into_iter()
.map(|s| (s, HashSet::from([WorkflowErrorType::Nondeterminism])))
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
)
.build()
.map_err(|err| error!("Invalid worker options: {}", err))
}

fn build_tuner(options: Struct) -> Result<TunerHolder, Error> {
let (workflow_slot_options, resource_slot_options) = build_tuner_slot_options(
options
Expand Down
Loading

0 comments on commit c7b1789

Please sign in to comment.