Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow replayer support #204

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ until the SDK is marked stable.
- [Activity Worker Shutdown](#activity-worker-shutdown)
- [Activity Concurrency and Executors](#activity-concurrency-and-executors)
- [Activity Testing](#activity-testing)
- [Ractors](#ractors)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to auto-generate this TOC entry when I made this section last

- [Platform Support](#platform-support)
- [Development](#development)
- [Build](#build)
Expand Down Expand Up @@ -853,7 +854,48 @@ the mock activity class to make it appear as the real name.

#### Workflow Replay

TODO: Workflow replayer not yet implemented
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
assuming the `history_json` parameter below is given a JSON string of history exported from the CLI or web UI, the
following function will replay it:

```ruby
def replay_from_json(history_json)
replayer = Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow])
replayer.replay_workflow(Temporalio::WorkflowHistory.from_history_json(history_json))
end
```

If there is a non-determinism, this will raise an exception by default.

Workflow history can be loaded from more than just JSON. It can be fetched individually from a workflow handle, or even
in a list. For example, the following code will check that all workflow histories for a certain workflow type (i.e.
workflow class) are safe with the current workflow code.

```ruby
def check_past_histories(client)
replayer = Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow])
results = replayer.replay_workflows(client.list_workflows("WorkflowType = 'MyWorkflow'").map do |desc|
client.workflow_handle(desc.id, run_id: desc.run_id).fetch_history
end)
results.each { |res| raise res.replay_failure if res.replay_failure }
end
```

But this only raises at the end because by default `replay_workflows` does not raise on failure like `replay_workflow`
does. The `raise_on_replay_failure: true` parameter could be set, or the replay worker can be used to process each one
like so:

```ruby
def check_past_histories(client)
Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow]) do |worker|
client.list_workflows("WorkflowType = 'MyWorkflow'").each do |desc|
worker.replay_workflow(client.workflow_handle(desc.id, run_id: desc.run_id).fetch_history)
end
end
end
```

See the `WorkflowReplayer` API documentation for more details.

### Activities

Expand Down
1 change: 1 addition & 0 deletions temporalio/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions temporalio/ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" }
tokio = "1.26"
tokio-stream = "0.1"
tokio-util = "0.7"
tonic = "0.12"
tracing = "0.1"
Expand Down
181 changes: 120 additions & 61 deletions temporalio/ext/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use crate::{
client::Client,
enter_sync, error, id, new_error,
runtime::{AsyncCommand, RuntimeHandle},
runtime::{AsyncCommand, Runtime, RuntimeHandle},
util::{AsyncCallback, Struct},
ROOT_MOD,
};
Expand All @@ -20,15 +20,19 @@ use magnus::{
};
use prost::Message;
use temporal_sdk_core::{
replay::{HistoryForReplay, ReplayWorkerInput},
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions,
SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfigBuilder,
SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfig, WorkerConfigBuilder,
};
use temporal_sdk_core_api::{
errors::{PollError, WorkflowErrorType},
worker::SlotKind,
};
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
use temporal_sdk_core_protos::temporal::api::history::v1::History;
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::ReceiverStream;

pub fn init(ruby: &Ruby) -> Result<(), Error> {
let class = ruby
Expand All @@ -55,6 +59,11 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
)?;
class.define_method("replace_client", method!(Worker::replace_client, 1))?;
class.define_method("initiate_shutdown", method!(Worker::initiate_shutdown, 0))?;

let inner_class = class.define_class("WorkflowReplayer", class::object())?;
inner_class.define_singleton_method("new", function!(WorkflowReplayer::new, 2))?;
inner_class.define_method("push_history", method!(WorkflowReplayer::push_history, 2))?;

Ok(())
}

Expand Down Expand Up @@ -89,70 +98,13 @@ impl Worker {
let activity = options.member::<bool>(id!("activity"))?;
let workflow = options.member::<bool>(id!("workflow"))?;

// Build config
let config = WorkerConfigBuilder::default()
.namespace(options.member::<String>(id!("namespace"))?)
.task_queue(options.member::<String>(id!("task_queue"))?)
.worker_build_id(options.member::<String>(id!("build_id"))?)
.client_identity_override(options.member::<Option<String>>(id!("identity_override"))?)
.max_cached_workflows(options.member::<usize>(id!("max_cached_workflows"))?)
.max_concurrent_wft_polls(
options.member::<usize>(id!("max_concurrent_workflow_task_polls"))?,
)
.nonsticky_to_sticky_poll_ratio(
options.member::<f32>(id!("nonsticky_to_sticky_poll_ratio"))?,
)
.max_concurrent_at_polls(
options.member::<usize>(id!("max_concurrent_activity_task_polls"))?,
)
.no_remote_activities(options.member::<bool>(id!("no_remote_activities"))?)
.sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64(
options.member(id!("sticky_queue_schedule_to_start_timeout"))?,
))
.max_heartbeat_throttle_interval(Duration::from_secs_f64(
options.member(id!("max_heartbeat_throttle_interval"))?,
))
.default_heartbeat_throttle_interval(Duration::from_secs_f64(
options.member(id!("default_heartbeat_throttle_interval"))?,
))
.max_worker_activities_per_second(
options.member::<Option<f64>>(id!("max_worker_activities_per_second"))?,
)
.max_task_queue_activities_per_second(
options.member::<Option<f64>>(id!("max_task_queue_activities_per_second"))?,
)
.graceful_shutdown_period(Duration::from_secs_f64(
options.member(id!("graceful_shutdown_period"))?,
))
.use_worker_versioning(options.member::<bool>(id!("use_worker_versioning"))?)
.tuner(Arc::new(build_tuner(
options
.child(id!("tuner"))?
.ok_or_else(|| error!("Missing tuner"))?,
)?))
.workflow_failure_errors(
if options.member::<bool>(id!("nondeterminism_as_workflow_fail"))? {
HashSet::from([WorkflowErrorType::Nondeterminism])
} else {
HashSet::new()
},
)
.workflow_types_to_failure_errors(
options
.member::<Vec<String>>(id!("nondeterminism_as_workflow_fail_for_types"))?
.into_iter()
.map(|s| (s, HashSet::from([WorkflowErrorType::Nondeterminism])))
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
)
.build()
.map_err(|err| error!("Invalid worker options: {}", err))?;

let worker = temporal_sdk_core::init_worker(
&client.runtime_handle.core,
config,
build_config(options)?,
client.core.clone().into_inner(),
)
.map_err(|err| error!("Failed creating worker: {}", err))?;

Ok(Worker {
core: RefCell::new(Some(Arc::new(worker))),
runtime_handle: client.runtime_handle.clone(),
Expand Down Expand Up @@ -435,6 +387,113 @@ impl Worker {
}
}

#[derive(DataTypeFunctions, TypedData)]
#[magnus(
class = "Temporalio::Internal::Bridge::Worker::WorkflowReplayer",
free_immediately
)]
pub struct WorkflowReplayer {
tx: Sender<HistoryForReplay>,
runtime_handle: RuntimeHandle,
}

impl WorkflowReplayer {
pub fn new(runtime: &Runtime, options: Struct) -> Result<(Self, Worker), Error> {
enter_sync!(runtime.handle.clone());

let (tx, rx) = channel(1);

let core_worker = temporal_sdk_core::init_replay_worker(ReplayWorkerInput::new(
build_config(options)?,
ReceiverStream::new(rx),
))
.map_err(|err| error!("Failed creating worker: {}", err))?;

Ok((
WorkflowReplayer {
tx,
runtime_handle: runtime.handle.clone(),
},
Worker {
core: RefCell::new(Some(Arc::new(core_worker))),
runtime_handle: runtime.handle.clone(),
activity: false,
workflow: true,
},
))
}

pub fn push_history(&self, workflow_id: String, proto: RString) -> Result<(), Error> {
let history = History::decode(unsafe { proto.as_slice() })
.map_err(|err| error!("Invalid proto: {}", err))?;
let tx = self.tx.clone();
self.runtime_handle.core.tokio_handle().spawn(async move {
// Intentionally ignoring error here
let _ = tx.send(HistoryForReplay::new(history, workflow_id)).await;
});
Ok(())
}
}

fn build_config(options: Struct) -> Result<WorkerConfig, Error> {
WorkerConfigBuilder::default()
.namespace(options.member::<String>(id!("namespace"))?)
.task_queue(options.member::<String>(id!("task_queue"))?)
.worker_build_id(options.member::<String>(id!("build_id"))?)
.client_identity_override(options.member::<Option<String>>(id!("identity_override"))?)
.max_cached_workflows(options.member::<usize>(id!("max_cached_workflows"))?)
.max_concurrent_wft_polls(
options.member::<usize>(id!("max_concurrent_workflow_task_polls"))?,
)
.nonsticky_to_sticky_poll_ratio(
options.member::<f32>(id!("nonsticky_to_sticky_poll_ratio"))?,
)
.max_concurrent_at_polls(
options.member::<usize>(id!("max_concurrent_activity_task_polls"))?,
)
.no_remote_activities(options.member::<bool>(id!("no_remote_activities"))?)
.sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64(
options.member(id!("sticky_queue_schedule_to_start_timeout"))?,
))
.max_heartbeat_throttle_interval(Duration::from_secs_f64(
options.member(id!("max_heartbeat_throttle_interval"))?,
))
.default_heartbeat_throttle_interval(Duration::from_secs_f64(
options.member(id!("default_heartbeat_throttle_interval"))?,
))
.max_worker_activities_per_second(
options.member::<Option<f64>>(id!("max_worker_activities_per_second"))?,
)
.max_task_queue_activities_per_second(
options.member::<Option<f64>>(id!("max_task_queue_activities_per_second"))?,
)
.graceful_shutdown_period(Duration::from_secs_f64(
options.member(id!("graceful_shutdown_period"))?,
))
.use_worker_versioning(options.member::<bool>(id!("use_worker_versioning"))?)
.tuner(Arc::new(build_tuner(
options
.child(id!("tuner"))?
.ok_or_else(|| error!("Missing tuner"))?,
)?))
.workflow_failure_errors(
if options.member::<bool>(id!("nondeterminism_as_workflow_fail"))? {
HashSet::from([WorkflowErrorType::Nondeterminism])
} else {
HashSet::new()
},
)
.workflow_types_to_failure_errors(
options
.member::<Vec<String>>(id!("nondeterminism_as_workflow_fail_for_types"))?
.into_iter()
.map(|s| (s, HashSet::from([WorkflowErrorType::Nondeterminism])))
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
)
.build()
.map_err(|err| error!("Invalid worker options: {}", err))
}

fn build_tuner(options: Struct) -> Result<TunerHolder, Error> {
let (workflow_slot_options, resource_slot_options) = build_tuner_slot_options(
options
Expand Down
Loading
Loading