From c7b17892ca3dcfb973de8ce629c8b723cbfaf665 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Thu, 23 Jan 2025 09:05:55 -0600 Subject: [PATCH] Workflow replayer support Fixes #187 --- README.md | 44 ++- temporalio/Cargo.lock | 1 + temporalio/ext/Cargo.toml | 1 + temporalio/ext/src/worker.rs | 181 +++++---- .../internal/worker/workflow_worker.rb | 72 +++- temporalio/lib/temporalio/worker.rb | 76 ++-- temporalio/lib/temporalio/worker/tuner.rb | 38 ++ .../temporalio/worker/workflow_executor.rb | 2 +- .../worker/workflow_executor/thread_pool.rb | 10 +- .../temporalio/worker/workflow_replayer.rb | 343 ++++++++++++++++++ temporalio/lib/temporalio/workflow_history.rb | 27 +- .../sig/temporalio/internal/bridge/worker.rbs | 6 + .../internal/worker/multi_runner.rbs | 21 +- .../internal/worker/workflow_worker.rbs | 25 +- temporalio/sig/temporalio/worker.rbs | 5 - temporalio/sig/temporalio/worker/tuner.rbs | 4 + .../temporalio/worker/workflow_executor.rbs | 2 +- .../worker/workflow_executor/thread_pool.rbs | 3 +- .../temporalio/worker/workflow_replayer.rbs | 96 +++++ .../sig/temporalio/workflow_history.rbs | 6 + .../test/worker/workflow_replayer_test.rb | 167 +++++++++ 21 files changed, 976 insertions(+), 154 deletions(-) create mode 100644 temporalio/lib/temporalio/worker/workflow_replayer.rb create mode 100644 temporalio/sig/temporalio/worker/workflow_replayer.rbs create mode 100644 temporalio/test/worker/workflow_replayer_test.rb diff --git a/README.md b/README.md index 58881758..753c34e5 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,7 @@ until the SDK is marked stable. - [Activity Worker Shutdown](#activity-worker-shutdown) - [Activity Concurrency and Executors](#activity-concurrency-and-executors) - [Activity Testing](#activity-testing) + - [Ractors](#ractors) - [Platform Support](#platform-support) - [Development](#development) - [Build](#build) @@ -853,7 +854,48 @@ the mock activity class to make it appear as the real name. #### Workflow Replay -TODO: Workflow replayer not yet implemented +Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example, +assuming the `history_json` parameter below is given a JSON string of history exported from the CLI or web UI, the +following function will replay it: + +```ruby +def replay_from_json(history_json) + replayer = Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow]) + replayer.replay_workflow(Temporalio::WorkflowHistory.from_history_json(history_json)) +end +``` + +If there is a non-determinism, this will raise an exception by default. + +Workflow history can be loaded from more than just JSON. It can be fetched individually from a workflow handle, or even +in a list. For example, the following code will check that all workflow histories for a certain workflow type (i.e. +workflow class) are safe with the current workflow code. + +```ruby +def check_past_histories(client) + replayer = Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow]) + results = replayer.replay_workflows(client.list_workflows("WorkflowType = 'MyWorkflow'").map do |desc| + client.workflow_handle(desc.id, run_id: desc.run_id).fetch_history + end) + results.each { |res| raise res.replay_failure if res.replay_failure } +end +``` + +But this only raises at the end because by default `replay_workflows` does not raise on failure like `replay_workflow` +does. The `raise_on_replay_failure: true` parameter could be set, or the replay worker can be used to process each one +like so: + +```ruby +def check_past_histories(client) + Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow]) do |worker| + client.list_workflows("WorkflowType = 'MyWorkflow'").each do |desc| + worker.replay_workflow(client.workflow_handle(desc.id, run_id: desc.run_id).fetch_history) + end + end +end +``` + +See the `WorkflowReplayer` API documentation for more details. ### Activities diff --git a/temporalio/Cargo.lock b/temporalio/Cargo.lock index 98927c2e..5e9231d9 100644 --- a/temporalio/Cargo.lock +++ b/temporalio/Cargo.lock @@ -3282,6 +3282,7 @@ dependencies = [ "temporal-sdk-core-api", "temporal-sdk-core-protos", "tokio", + "tokio-stream", "tokio-util", "tonic", "tracing", diff --git a/temporalio/ext/Cargo.toml b/temporalio/ext/Cargo.toml index 32288896..fe91918b 100644 --- a/temporalio/ext/Cargo.toml +++ b/temporalio/ext/Cargo.toml @@ -20,6 +20,7 @@ temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = [" temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" } temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" } tokio = "1.26" +tokio-stream = "0.1" tokio-util = "0.7" tonic = "0.12" tracing = "0.1" diff --git a/temporalio/ext/src/worker.rs b/temporalio/ext/src/worker.rs index 2c3947d6..ea39a061 100644 --- a/temporalio/ext/src/worker.rs +++ b/temporalio/ext/src/worker.rs @@ -8,7 +8,7 @@ use std::{ use crate::{ client::Client, enter_sync, error, id, new_error, - runtime::{AsyncCommand, RuntimeHandle}, + runtime::{AsyncCommand, Runtime, RuntimeHandle}, util::{AsyncCallback, Struct}, ROOT_MOD, }; @@ -20,8 +20,9 @@ use magnus::{ }; use prost::Message; use temporal_sdk_core::{ + replay::{HistoryForReplay, ReplayWorkerInput}, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions, - SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfigBuilder, + SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfig, WorkerConfigBuilder, }; use temporal_sdk_core_api::{ errors::{PollError, WorkflowErrorType}, @@ -29,6 +30,9 @@ use temporal_sdk_core_api::{ }; use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion}; +use temporal_sdk_core_protos::temporal::api::history::v1::History; +use tokio::sync::mpsc::{channel, Sender}; +use tokio_stream::wrappers::ReceiverStream; pub fn init(ruby: &Ruby) -> Result<(), Error> { let class = ruby @@ -55,6 +59,11 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> { )?; class.define_method("replace_client", method!(Worker::replace_client, 1))?; class.define_method("initiate_shutdown", method!(Worker::initiate_shutdown, 0))?; + + let inner_class = class.define_class("WorkflowReplayer", class::object())?; + inner_class.define_singleton_method("new", function!(WorkflowReplayer::new, 2))?; + inner_class.define_method("push_history", method!(WorkflowReplayer::push_history, 2))?; + Ok(()) } @@ -89,70 +98,13 @@ impl Worker { let activity = options.member::(id!("activity"))?; let workflow = options.member::(id!("workflow"))?; - // Build config - let config = WorkerConfigBuilder::default() - .namespace(options.member::(id!("namespace"))?) - .task_queue(options.member::(id!("task_queue"))?) - .worker_build_id(options.member::(id!("build_id"))?) - .client_identity_override(options.member::>(id!("identity_override"))?) - .max_cached_workflows(options.member::(id!("max_cached_workflows"))?) - .max_concurrent_wft_polls( - options.member::(id!("max_concurrent_workflow_task_polls"))?, - ) - .nonsticky_to_sticky_poll_ratio( - options.member::(id!("nonsticky_to_sticky_poll_ratio"))?, - ) - .max_concurrent_at_polls( - options.member::(id!("max_concurrent_activity_task_polls"))?, - ) - .no_remote_activities(options.member::(id!("no_remote_activities"))?) - .sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64( - options.member(id!("sticky_queue_schedule_to_start_timeout"))?, - )) - .max_heartbeat_throttle_interval(Duration::from_secs_f64( - options.member(id!("max_heartbeat_throttle_interval"))?, - )) - .default_heartbeat_throttle_interval(Duration::from_secs_f64( - options.member(id!("default_heartbeat_throttle_interval"))?, - )) - .max_worker_activities_per_second( - options.member::>(id!("max_worker_activities_per_second"))?, - ) - .max_task_queue_activities_per_second( - options.member::>(id!("max_task_queue_activities_per_second"))?, - ) - .graceful_shutdown_period(Duration::from_secs_f64( - options.member(id!("graceful_shutdown_period"))?, - )) - .use_worker_versioning(options.member::(id!("use_worker_versioning"))?) - .tuner(Arc::new(build_tuner( - options - .child(id!("tuner"))? - .ok_or_else(|| error!("Missing tuner"))?, - )?)) - .workflow_failure_errors( - if options.member::(id!("nondeterminism_as_workflow_fail"))? { - HashSet::from([WorkflowErrorType::Nondeterminism]) - } else { - HashSet::new() - }, - ) - .workflow_types_to_failure_errors( - options - .member::>(id!("nondeterminism_as_workflow_fail_for_types"))? - .into_iter() - .map(|s| (s, HashSet::from([WorkflowErrorType::Nondeterminism]))) - .collect::>>(), - ) - .build() - .map_err(|err| error!("Invalid worker options: {}", err))?; - let worker = temporal_sdk_core::init_worker( &client.runtime_handle.core, - config, + build_config(options)?, client.core.clone().into_inner(), ) .map_err(|err| error!("Failed creating worker: {}", err))?; + Ok(Worker { core: RefCell::new(Some(Arc::new(worker))), runtime_handle: client.runtime_handle.clone(), @@ -435,6 +387,113 @@ impl Worker { } } +#[derive(DataTypeFunctions, TypedData)] +#[magnus( + class = "Temporalio::Internal::Bridge::Worker::WorkflowReplayer", + free_immediately +)] +pub struct WorkflowReplayer { + tx: Sender, + runtime_handle: RuntimeHandle, +} + +impl WorkflowReplayer { + pub fn new(runtime: &Runtime, options: Struct) -> Result<(Self, Worker), Error> { + enter_sync!(runtime.handle.clone()); + + let (tx, rx) = channel(1); + + let core_worker = temporal_sdk_core::init_replay_worker(ReplayWorkerInput::new( + build_config(options)?, + ReceiverStream::new(rx), + )) + .map_err(|err| error!("Failed creating worker: {}", err))?; + + Ok(( + WorkflowReplayer { + tx, + runtime_handle: runtime.handle.clone(), + }, + Worker { + core: RefCell::new(Some(Arc::new(core_worker))), + runtime_handle: runtime.handle.clone(), + activity: false, + workflow: true, + }, + )) + } + + pub fn push_history(&self, workflow_id: String, proto: RString) -> Result<(), Error> { + let history = History::decode(unsafe { proto.as_slice() }) + .map_err(|err| error!("Invalid proto: {}", err))?; + let tx = self.tx.clone(); + self.runtime_handle.core.tokio_handle().spawn(async move { + // Intentionally ignoring error here + let _ = tx.send(HistoryForReplay::new(history, workflow_id)).await; + }); + Ok(()) + } +} + +fn build_config(options: Struct) -> Result { + WorkerConfigBuilder::default() + .namespace(options.member::(id!("namespace"))?) + .task_queue(options.member::(id!("task_queue"))?) + .worker_build_id(options.member::(id!("build_id"))?) + .client_identity_override(options.member::>(id!("identity_override"))?) + .max_cached_workflows(options.member::(id!("max_cached_workflows"))?) + .max_concurrent_wft_polls( + options.member::(id!("max_concurrent_workflow_task_polls"))?, + ) + .nonsticky_to_sticky_poll_ratio( + options.member::(id!("nonsticky_to_sticky_poll_ratio"))?, + ) + .max_concurrent_at_polls( + options.member::(id!("max_concurrent_activity_task_polls"))?, + ) + .no_remote_activities(options.member::(id!("no_remote_activities"))?) + .sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64( + options.member(id!("sticky_queue_schedule_to_start_timeout"))?, + )) + .max_heartbeat_throttle_interval(Duration::from_secs_f64( + options.member(id!("max_heartbeat_throttle_interval"))?, + )) + .default_heartbeat_throttle_interval(Duration::from_secs_f64( + options.member(id!("default_heartbeat_throttle_interval"))?, + )) + .max_worker_activities_per_second( + options.member::>(id!("max_worker_activities_per_second"))?, + ) + .max_task_queue_activities_per_second( + options.member::>(id!("max_task_queue_activities_per_second"))?, + ) + .graceful_shutdown_period(Duration::from_secs_f64( + options.member(id!("graceful_shutdown_period"))?, + )) + .use_worker_versioning(options.member::(id!("use_worker_versioning"))?) + .tuner(Arc::new(build_tuner( + options + .child(id!("tuner"))? + .ok_or_else(|| error!("Missing tuner"))?, + )?)) + .workflow_failure_errors( + if options.member::(id!("nondeterminism_as_workflow_fail"))? { + HashSet::from([WorkflowErrorType::Nondeterminism]) + } else { + HashSet::new() + }, + ) + .workflow_types_to_failure_errors( + options + .member::>(id!("nondeterminism_as_workflow_fail_for_types"))? + .into_iter() + .map(|s| (s, HashSet::from([WorkflowErrorType::Nondeterminism]))) + .collect::>>(), + ) + .build() + .map_err(|err| error!("Invalid worker options: {}", err)) +} + fn build_tuner(options: Struct) -> Result { let (workflow_slot_options, resource_slot_options) = build_tuner_slot_options( options diff --git a/temporalio/lib/temporalio/internal/worker/workflow_worker.rb b/temporalio/lib/temporalio/internal/worker/workflow_worker.rb index 99d08269..119fd4f3 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_worker.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_worker.rb @@ -33,11 +33,47 @@ def self.workflow_definitions(workflows) end end - def initialize(worker:, bridge_worker:, workflow_definitions:) - @executor = worker.options.workflow_executor + def self.bridge_workflow_failure_exception_type_options( + workflow_failure_exception_types:, + workflow_definitions: + ) + as_fail = workflow_failure_exception_types.any? do |t| + t.is_a?(Class) && t >= Workflow::NondeterminismError + end + as_fail_for_types = workflow_definitions.values.map do |defn| + next unless defn.failure_exception_types.any? { |t| t.is_a?(Class) && t >= Workflow::NondeterminismError } - payload_codec = worker.options.client.data_converter.payload_codec - @workflow_payload_codec_thread_pool = worker.options.workflow_payload_codec_thread_pool + # If they tried to do this on a dynamic workflow and haven't already set worker-level option, warn + unless defn.name || as_fail + warn('Note, dynamic workflows cannot trap non-determinism errors, so worker-level ' \ + 'workflow_failure_exception_types should be set to capture that if that is the intention') + end + defn.name + end.compact + [as_fail, as_fail_for_types] + end + + def initialize( + bridge_worker:, + namespace:, + task_queue:, + workflow_definitions:, + workflow_executor:, + logger:, + data_converter:, + metric_meter:, + workflow_interceptors:, + disable_eager_activity_execution:, + illegal_workflow_calls:, + workflow_failure_exception_types:, + workflow_payload_codec_thread_pool:, + debug_mode:, + on_eviction: nil + ) + @executor = workflow_executor + + payload_codec = data_converter.payload_codec + @workflow_payload_codec_thread_pool = workflow_payload_codec_thread_pool if !Fiber.current_scheduler && payload_codec && !@workflow_payload_codec_thread_pool raise ArgumentError, 'Must have workflow payload codec thread pool if providing codec and not using fibers' end @@ -55,19 +91,19 @@ def initialize(worker:, bridge_worker:, workflow_definitions:) @state = State.new( workflow_definitions:, bridge_worker:, - logger: worker.options.logger, - metric_meter: worker.options.client.connection.options.runtime.metric_meter, - data_converter: worker.options.client.data_converter, - deadlock_timeout: worker.options.debug_mode ? nil : 2.0, + logger:, + metric_meter:, + data_converter:, + deadlock_timeout: debug_mode ? nil : 2.0, # TODO(cretz): Make this more performant for the default set? illegal_calls: WorkflowInstance::IllegalCallTracer.frozen_validated_illegal_calls( - worker.options.illegal_workflow_calls || {} + illegal_workflow_calls || {} ), - namespace: worker.options.client.namespace, - task_queue: worker.options.task_queue, - disable_eager_activity_execution: worker.options.disable_eager_activity_execution, - workflow_interceptors: worker._workflow_interceptors, - workflow_failure_exception_types: worker.options.workflow_failure_exception_types.map do |t| + namespace:, + task_queue:, + disable_eager_activity_execution:, + workflow_interceptors:, + workflow_failure_exception_types: workflow_failure_exception_types.map do |t| unless t.is_a?(Class) && t < Exception raise ArgumentError, 'All failure types must classes inheriting Exception' end @@ -75,9 +111,10 @@ def initialize(worker:, bridge_worker:, workflow_definitions:) t end.freeze ) + @state.on_eviction = on_eviction if on_eviction # Validate worker - @executor._validate_worker(worker, @state) + @executor._validate_worker(self, @state) end def handle_activation(runner:, activation:, decoded:) @@ -149,6 +186,8 @@ class State :illegal_calls, :namespace, :task_queue, :disable_eager_activity_execution, :workflow_interceptors, :workflow_failure_exception_types + attr_writer :on_eviction + def initialize( workflow_definitions:, bridge_worker:, logger:, metric_meter:, data_converter:, deadlock_timeout:, illegal_calls:, namespace:, task_queue:, disable_eager_activity_execution:, @@ -182,8 +221,9 @@ def get_or_create_running_workflow(run_id, &) instance end - def evict_running_workflow(run_id) + def evict_running_workflow(run_id, cache_remove_job) @running_workflows_mutex.synchronize { @running_workflows.delete(run_id) } + @on_eviction&.call(run_id, cache_remove_job) end def evict_all diff --git a/temporalio/lib/temporalio/worker.rb b/temporalio/lib/temporalio/worker.rb index b3ca0ced..4a0cb216 100644 --- a/temporalio/lib/temporalio/worker.rb +++ b/temporalio/lib/temporalio/worker.rb @@ -137,7 +137,8 @@ def self.run_all( case event when Internal::Worker::MultiRunner::Event::PollSuccess # Successful poll - event.worker._on_poll_bytes(runner, event.worker_type, event.bytes) + event.worker #: Worker + ._on_poll_bytes(runner, event.worker_type, event.bytes) when Internal::Worker::MultiRunner::Event::PollFailure # Poll failure, this causes shutdown of all workers logger.error('Poll failure (beginning worker shutdown if not already occurring)') @@ -274,7 +275,7 @@ def self.default_illegal_workflow_calls end end - # @return [Options] Frozen options for this client which has the same attributes as {initialize}. + # @return [Options] Options for this worker which has the same attributes as {initialize}. attr_reader :options # Create a new worker. At least one activity or workflow must be present. @@ -411,19 +412,10 @@ def initialize( # Preload workflow definitions and some workflow settings for the bridge workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions(workflows) - nondeterminism_as_workflow_fail = workflow_failure_exception_types.any? do |t| - t.is_a?(Class) && t >= Workflow::NondeterminismError - end - nondeterminism_as_workflow_fail_for_types = workflow_definitions.values.map do |defn| - next unless defn.failure_exception_types.any? { |t| t.is_a?(Class) && t >= Workflow::NondeterminismError } - - # If they tried to do this on a dynamic workflow and haven't already set worker-level option, warn - unless defn.name || nondeterminism_as_workflow_fail - warn('Note, dynamic workflows cannot trap non-determinism errors, so worker-level ' \ - 'workflow_failure_exception_types should be set to capture that if that is the intention') - end - defn.name - end.compact + nondeterminism_as_workflow_fail, nondeterminism_as_workflow_fail_for_types = + Internal::Worker::WorkflowWorker.bridge_workflow_failure_exception_type_options( + workflow_failure_exception_types:, workflow_definitions: + ) # Create the bridge worker @bridge_worker = Internal::Bridge::Worker.new( @@ -433,11 +425,7 @@ def initialize( workflow: !workflows.empty?, namespace: client.namespace, task_queue:, - tuner: Internal::Bridge::Worker::TunerOptions.new( - workflow_slot_supplier: to_bridge_slot_supplier_options(tuner.workflow_slot_supplier), - activity_slot_supplier: to_bridge_slot_supplier_options(tuner.activity_slot_supplier), - local_activity_slot_supplier: to_bridge_slot_supplier_options(tuner.local_activity_slot_supplier) - ), + tuner: tuner._to_bridge_options, build_id:, identity_override: identity, max_cached_workflows:, @@ -476,9 +464,22 @@ def initialize( bridge_worker: @bridge_worker) end unless workflows.empty? - @workflow_worker = Internal::Worker::WorkflowWorker.new(worker: self, - bridge_worker: @bridge_worker, - workflow_definitions:) + @workflow_worker = Internal::Worker::WorkflowWorker.new( + bridge_worker: @bridge_worker, + namespace: client.namespace, + task_queue:, + workflow_definitions:, + workflow_executor:, + logger:, + data_converter: client.data_converter, + metric_meter: client.connection.options.runtime.metric_meter, + workflow_interceptors: @workflow_interceptors, + disable_eager_activity_execution:, + illegal_workflow_calls:, + workflow_failure_exception_types:, + workflow_payload_codec_thread_pool:, + debug_mode: + ) end # Validate worker @@ -543,11 +544,6 @@ def _activity_interceptors @activity_interceptors end - # @!visibility private - def _workflow_interceptors - @workflow_interceptors - end - # @!visibility private def _on_poll_bytes(runner, worker_type, bytes) case worker_type @@ -569,29 +565,5 @@ def _on_shutdown_complete @workflow_worker&.on_shutdown_complete @workflow_worker = nil end - - private - - def to_bridge_slot_supplier_options(slot_supplier) - if slot_supplier.is_a?(Tuner::SlotSupplier::Fixed) - Internal::Bridge::Worker::TunerSlotSupplierOptions.new( - fixed_size: slot_supplier.slots, - resource_based: nil - ) - elsif slot_supplier.is_a?(Tuner::SlotSupplier::ResourceBased) - Internal::Bridge::Worker::TunerSlotSupplierOptions.new( - fixed_size: nil, - resource_based: Internal::Bridge::Worker::TunerResourceBasedSlotSupplierOptions.new( - target_mem_usage: slot_supplier.tuner_options.target_memory_usage, - target_cpu_usage: slot_supplier.tuner_options.target_cpu_usage, - min_slots: slot_supplier.slot_options.min_slots, - max_slots: slot_supplier.slot_options.max_slots, - ramp_throttle: slot_supplier.slot_options.ramp_throttle - ) - ) - else - raise ArgumentError, 'Tuner slot suppliers must be instances of Fixed or ResourceBased' - end - end end end diff --git a/temporalio/lib/temporalio/worker/tuner.rb b/temporalio/lib/temporalio/worker/tuner.rb index cb495d7a..60f8916b 100644 --- a/temporalio/lib/temporalio/worker/tuner.rb +++ b/temporalio/lib/temporalio/worker/tuner.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require 'temporalio/internal/bridge/worker' + module Temporalio class Worker # Worker tuner that allows for dynamic customization of some aspects of worker configuration. @@ -18,6 +20,14 @@ class Fixed < SlotSupplier def initialize(slots) # rubocop:disable Lint/MissingSuper @slots = slots end + + # @!visibility private + def _to_bridge_options + Internal::Bridge::Worker::TunerSlotSupplierOptions.new( + fixed_size: slots, + resource_based: nil + ) + end end # A slot supplier that will dynamically adjust the number of slots based on resource usage. @@ -34,6 +44,25 @@ def initialize(tuner_options:, slot_options:) # rubocop:disable Lint/MissingSupe @tuner_options = tuner_options @slot_options = slot_options end + + # @!visibility private + def _to_bridge_options + Internal::Bridge::Worker::TunerSlotSupplierOptions.new( + fixed_size: nil, + resource_based: Internal::Bridge::Worker::TunerResourceBasedSlotSupplierOptions.new( + target_mem_usage: tuner_options.target_memory_usage, + target_cpu_usage: tuner_options.target_cpu_usage, + min_slots: slot_options.min_slots, + max_slots: slot_options.max_slots, + ramp_throttle: slot_options.ramp_throttle + ) + ) + end + end + + # @!visibility private + def _to_bridge_options + raise ArgumentError, 'Tuner slot suppliers must be instances of Fixed or ResourceBased' end end @@ -146,6 +175,15 @@ def initialize( @activity_slot_supplier = activity_slot_supplier @local_activity_slot_supplier = local_activity_slot_supplier end + + # @!visibility private + def _to_bridge_options + Internal::Bridge::Worker::TunerOptions.new( + workflow_slot_supplier: workflow_slot_supplier._to_bridge_options, + activity_slot_supplier: activity_slot_supplier._to_bridge_options, + local_activity_slot_supplier: local_activity_slot_supplier._to_bridge_options + ) + end end end end diff --git a/temporalio/lib/temporalio/worker/workflow_executor.rb b/temporalio/lib/temporalio/worker/workflow_executor.rb index 77ce413c..dcb51ba3 100644 --- a/temporalio/lib/temporalio/worker/workflow_executor.rb +++ b/temporalio/lib/temporalio/worker/workflow_executor.rb @@ -13,7 +13,7 @@ def initialize end # @!visibility private - def _validate_worker(worker, worker_state) + def _validate_worker(workflow_worker, worker_state) raise NotImplementedError end diff --git a/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb b/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb index cf57f679..962ea984 100644 --- a/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb +++ b/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb @@ -36,7 +36,7 @@ def initialize(max_threads: [4, Etc.nprocessors].max, thread_pool: Temporalio::W end # @!visibility private - def _validate_worker(worker, worker_state) + def _validate_worker(workflow_worker, worker_state) # Do nothing end @@ -137,7 +137,7 @@ def activate(activation, worker_state, &) # If it's eviction only, just evict inline and do nothing else if cache_remove_job && activation.jobs.size == 1 - evict(worker_state, activation.run_id) + evict(worker_state, activation.run_id, cache_remove_job) worker_state.logger.debug('Sending empty workflow completion') if LOG_ACTIVATIONS yield Internal::Bridge::Api::WorkflowCompletion::WorkflowActivationCompletion.new( run_id: activation.run_id, @@ -173,7 +173,7 @@ def activate(activation, worker_state, &) end # Go ahead and evict if there is an eviction job - evict(worker_state, activation.run_id) if cache_remove_job + evict(worker_state, activation.run_id, cache_remove_job) if cache_remove_job # Complete the activation worker_state.logger.debug("Sending workflow completion: #{completion}") if LOG_ACTIVATIONS @@ -214,8 +214,8 @@ def create_instance(initial_activation, worker_state) ) end - def evict(worker_state, run_id) - worker_state.evict_running_workflow(run_id) + def evict(worker_state, run_id, cache_remove_job) + worker_state.evict_running_workflow(run_id, cache_remove_job) @executor._remove_workflow(worker_state, run_id) end end diff --git a/temporalio/lib/temporalio/worker/workflow_replayer.rb b/temporalio/lib/temporalio/worker/workflow_replayer.rb new file mode 100644 index 00000000..6dd6d354 --- /dev/null +++ b/temporalio/lib/temporalio/worker/workflow_replayer.rb @@ -0,0 +1,343 @@ +# frozen_string_literal: true + +require 'temporalio/api' +require 'temporalio/converters' +require 'temporalio/internal/bridge' +require 'temporalio/internal/bridge/worker' +require 'temporalio/internal/worker/multi_runner' +require 'temporalio/internal/worker/workflow_worker' +require 'temporalio/worker/interceptor' +require 'temporalio/worker/thread_pool' +require 'temporalio/worker/tuner' +require 'temporalio/worker/workflow_executor' +require 'temporalio/workflow' +require 'temporalio/workflow_history' + +module Temporalio + class Worker + # Replayer to replay workflows from existing history. + class WorkflowReplayer + Options = Data.define( + :workflows, + :namespace, + :task_queue, + :data_converter, + :workflow_executor, + :interceptors, + :build_id, + :identity, + :logger, + :illegal_workflow_calls, + :workflow_failure_exception_types, + :workflow_payload_codec_thread_pool, + :debug_mode, + :runtime + ) + + # Options as returned from {options} representing the options passed to the constructor. + class Options; end # rubocop:disable Lint/EmptyClass + + # @return [Options] Options for this replayer which has the same attributes as {initialize}. + attr_reader :options + + # Create a new replayer. This combines some options from both {Worker.initialize} and {Client.initialize}. + # + # @param workflows [Array>] Workflows for this replayer. + # @param namespace [String] Namespace as set in the workflow info. + # @param task_queue [String] Task queue as set in the workflow info. + # @param data_converter [Converters::DataConverter] Data converter to use for all data conversions to/from + # payloads. + # @param workflow_executor [WorkflowExecutor] Workflow executor that workflow tasks run within. This must be a + # {WorkflowExecutor::ThreadPool} currently. + # @param interceptors [Array] Workflow interceptors. + # @param build_id [String] Unique identifier for the current runtime. This is best set as a unique value + # representing all code and should change only when code does. This can be something like a git commit hash. If + # unset, default is hash of known Ruby code. + # @param identity [String, nil] Override the identity for this replater. + # @param logger [Logger] Logger to use. Defaults to stdout with warn level. Callers setting this logger are + # responsible for closing it. + # @param illegal_workflow_calls [Hash]>] Set of illegal workflow calls that are + # considered unsafe/non-deterministic and will raise if seen. The key of the hash is the fully qualified string + # class name (no leading `::`). The value is either `:all` which means any use of the class, or an array of + # symbols for methods on the class that cannot be used. The methods refer to either instance or class methods, + # there is no way to differentiate at this time. + # @param workflow_failure_exception_types [Array>] Workflow failure exception types. This is the + # set of exception types that, if a workflow-thrown exception extends, will cause the workflow/update to fail + # instead of suspending the workflow via task failure. These are applied in addition to the + # `workflow_failure_exception_type` on the workflow definition class itself. If {::Exception} is set, it + # effectively will fail a workflow/update in all user exception cases. + # @param workflow_payload_codec_thread_pool [ThreadPool, nil] Thread pool to run payload codec encode/decode + # within. This is required if a payload codec exists and the worker is not fiber based. Codecs can potentially + # block execution which is why they need to be run in the background. + # @param debug_mode [Boolean] If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks + # if they block the thread for too long. This defaults to true if the `TEMPORAL_DEBUG` environment variable is + # `true` or `1`. + # @param runtime [Runtime] Runtime for this replayer. + # + # @yield If a block is present, this is the equivalent of calling {with_replay_worker} with the block and + # discarding the result. + def initialize( + workflows:, + namespace: 'ReplayNamespace', + task_queue: 'ReplayTaskQueue', + data_converter: Converters::DataConverter.default, + workflow_executor: WorkflowExecutor::ThreadPool.default, + interceptors: [], + build_id: Worker.default_build_id, + identity: nil, + logger: Logger.new($stdout, level: Logger::WARN), + illegal_workflow_calls: Worker.default_illegal_workflow_calls, + workflow_failure_exception_types: [], + workflow_payload_codec_thread_pool: nil, + debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase), + runtime: Runtime.default, + & + ) + @options = Options.new( + workflows:, + namespace:, + task_queue:, + data_converter:, + workflow_executor:, + interceptors:, + build_id:, + identity:, + logger:, + illegal_workflow_calls:, + workflow_failure_exception_types:, + workflow_payload_codec_thread_pool:, + debug_mode:, + runtime: + ).freeze + # Preload definitions and other settings + @workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions(workflows) + @nondeterminism_as_workflow_fail, @nondeterminism_as_workflow_fail_for_types = + Internal::Worker::WorkflowWorker.bridge_workflow_failure_exception_type_options( + workflow_failure_exception_types:, workflow_definitions: @workflow_definitions + ) + # If there is a block, we'll go ahead and assume it's for with_replay_worker + with_replay_worker(&) if block_given? # steep:ignore + end + + # Replay a workflow history. + # + # If doing multiple histories, it is better to use {replay_workflows} or {with_replay_worker} since they create + # a replay worker just once instead of each time like this call does. + # + # @param history [WorkflowHistory] History to replay. + # @param raise_on_replay_failure [Boolean] If true, the default, this will raise an exception on any replay + # failure. If false and the replay fails, the failure will be available in {ReplayResult.replay_failure}. + # + # @return [ReplayResult] Result of the replay. + def replay_workflow(history, raise_on_replay_failure: true) + with_replay_worker { |worker| worker.replay_workflow(history, raise_on_replay_failure:) } + end + + # Replay multiple workflow histories. + # + # @param histories [Enumerable] Histories to replay. + # @param raise_on_replay_failure [Boolean] If true, this will raise an exception on any replay failure. If false, + # the default, and the replay fails, the failure will be available in {ReplayResult.replay_failure}. + # + # @return [Array] Results of the replay. + def replay_workflows(histories, raise_on_replay_failure: false) + with_replay_worker do |worker| + histories.map { |h| worker.replay_workflow(h, raise_on_replay_failure:) } + end + end + + # Run a block of code with a {ReplayWorker} to execute replays. + # + # @yield Block of code to run with a replay worker. + # @yieldparam [ReplayWorker] Worker to run replays on. Note, only one workflow can replay at a time. + # @yieldreturn [Object] Result of the block. + def with_replay_worker(&) + worker = ReplayWorker.new( + options:, + workflow_definitions: @workflow_definitions, + nondeterminism_as_workflow_fail: @nondeterminism_as_workflow_fail, + nondeterminism_as_workflow_fail_for_types: @nondeterminism_as_workflow_fail_for_types + ) + begin + yield worker + ensure + worker._shutdown + end + end + + # Result of a single workflow replay run. + class ReplayResult + # @return [WorkflowHistory] History originally passed in to the replayer. + attr_reader :history + + # @return [Exception, nil] Failure during replay if any. + attr_reader :replay_failure + + # @!visibility private + def initialize(history:, replay_failure:) + @history = history + @replay_failure = replay_failure + end + end + + # Replay worker that can be used to replay individual workflow runs. Only one call to {replay_workflow} can be + # made at a time. + class ReplayWorker + # @!visibility private + def initialize( + options:, + workflow_definitions:, + nondeterminism_as_workflow_fail:, + nondeterminism_as_workflow_fail_for_types: + ) + # Create the bridge worker and the replayer + @bridge_replayer, @bridge_worker = Internal::Bridge::Worker::WorkflowReplayer.new( + options.runtime._core_runtime, + Internal::Bridge::Worker::Options.new( + activity: false, + workflow: true, + namespace: options.namespace, + task_queue: options.task_queue, + tuner: Tuner.create_fixed( + workflow_slots: 2, activity_slots: 1, local_activity_slots: 1 + )._to_bridge_options, + build_id: options.build_id, + identity_override: options.identity, + max_cached_workflows: 2, + max_concurrent_workflow_task_polls: 1, + nonsticky_to_sticky_poll_ratio: 1.0, + max_concurrent_activity_task_polls: 1, + no_remote_activities: true, + sticky_queue_schedule_to_start_timeout: 1.0, + max_heartbeat_throttle_interval: 1.0, + default_heartbeat_throttle_interval: 1.0, + max_worker_activities_per_second: nil, + max_task_queue_activities_per_second: nil, + graceful_shutdown_period: 0.0, + use_worker_versioning: false, + nondeterminism_as_workflow_fail:, + nondeterminism_as_workflow_fail_for_types: + ) + ) + + # Create the workflow worker + @workflow_worker = Internal::Worker::WorkflowWorker.new( + bridge_worker: @bridge_worker, + namespace: options.namespace, + task_queue: options.task_queue, + workflow_definitions:, + workflow_executor: options.workflow_executor, + logger: options.logger, + data_converter: options.data_converter, + metric_meter: options.runtime.metric_meter, + workflow_interceptors: options.interceptors.select do |i| + i.is_a?(Interceptor::Workflow) + end, + disable_eager_activity_execution: false, + illegal_workflow_calls: options.illegal_workflow_calls, + workflow_failure_exception_types: options.workflow_failure_exception_types, + workflow_payload_codec_thread_pool: options.workflow_payload_codec_thread_pool, + debug_mode: options.debug_mode, + on_eviction: proc { |_, remove_job| @last_workflow_remove_job = remove_job } # steep:ignore + ) + + # Create the runner + @runner = Internal::Worker::MultiRunner.new(workers: [self], shutdown_signals: []) + end + + # Replay a workflow history. + # + # @param history [WorkflowHistory] History to replay. + # @param raise_on_replay_failure [Boolean] If true, the default, this will raise an exception on any replay + # failure. If false and the replay fails, the failure will be available in {ReplayResult.replay_failure}. + # + # @return [ReplayResult] Result of the replay. + def replay_workflow(history, raise_on_replay_failure: true) + raise ArgumentError, 'Expected history as WorkflowHistory' unless history.is_a?(WorkflowHistory) + # Due to our event processing model, only one can run at a time + raise 'Already running' if @running + raise 'Replayer shutdown' if @shutdown + + # Push history proto + # TODO(cretz): Unset this + @running = true + @last_workflow_remove_job = nil + begin + @bridge_replayer.push_history( + history.workflow_id, Api::History::V1::History.new(events: history.events).to_proto + ) + + # Process events until workflow complete + until @last_workflow_remove_job + event = @runner.next_event + case event + when Internal::Worker::MultiRunner::Event::PollSuccess + @workflow_worker.handle_activation( + runner: @runner, + activation: Internal::Bridge::Api::WorkflowActivation::WorkflowActivation.decode(event.bytes), + decoded: false + ) + when Internal::Worker::MultiRunner::Event::WorkflowActivationDecoded + @workflow_worker.handle_activation(runner: @runner, activation: event.activation, decoded: true) + when Internal::Worker::MultiRunner::Event::WorkflowActivationComplete + @workflow_worker.handle_activation_complete( + runner: @runner, + activation_completion: event.activation_completion, + encoded: event.encoded, + completion_complete_queue: event.completion_complete_queue + ) + when Internal::Worker::MultiRunner::Event::WorkflowActivationCompletionComplete + # Ignore + else + raise "Unexpected event: #{event}" + end + end + + # Create exception if removal is due to error + err = if @last_workflow_remove_job.reason == :NONDETERMINISM + Workflow::NondeterminismError.new( + "#{@last_workflow_remove_job.reason}: #{@last_workflow_remove_job.message}" + ) + elsif !%i[CACHE_FULL LANG_REQUESTED].include?(@last_workflow_remove_job.reason) + Workflow::InvalidWorkflowStateError.new( + "#{@last_workflow_remove_job.reason}: #{@last_workflow_remove_job.message}" + ) + end + # Raise if wanting to raise, otherwise return result + raise err if raise_on_replay_failure && err + + ReplayResult.new(history:, replay_failure: err) + ensure + @running = false + end + end + + # @!visibility private + def _shutdown + @shutdown = true + @runner.initiate_shutdown + # Wait for all-pollers-shutdown before finalizing + until @runner.next_event.is_a?(Internal::Worker::MultiRunner::Event::AllPollersShutDown); end + @runner.wait_complete_and_finalize_shutdown + @workflow_worker.on_shutdown_complete + @workflow_worker = nil + end + + # @!visibility private + def _bridge_worker + @bridge_worker + end + + # @!visibility private + def _initiate_shutdown + _bridge_worker.initiate_shutdown + end + + # @!visibility private + def _wait_all_complete + # Do nothing + end + end + end + end +end diff --git a/temporalio/lib/temporalio/workflow_history.rb b/temporalio/lib/temporalio/workflow_history.rb index 521a556c..c8eea340 100644 --- a/temporalio/lib/temporalio/workflow_history.rb +++ b/temporalio/lib/temporalio/workflow_history.rb @@ -1,9 +1,19 @@ # frozen_string_literal: true +require 'temporalio/api' + module Temporalio # Representation of a workflow's history. class WorkflowHistory - # History events for the workflow. + # Convert a JSON string to workflow history. This supports the JSON format exported by Temporal UI and CLI. + # + # @param json [String] JSON string. + # @return [WorkflowHistory] Converted history. + def self.from_history_json(json) + WorkflowHistory.new(Api::History::V1::History.decode_json(json).events.to_a) + end + + # @return [Array] History events for the workflow. attr_reader :events # @!visibility private @@ -18,5 +28,20 @@ def workflow_id start.workflow_id end + + # Convert to history JSON. + # + # @return [String] JSON string. + def to_history_json + Api::History::V1::History.encode_json(Api::History::V1::History.new(events:)) + end + + # Compare history. + # + # @param other [WorkflowHistory] Other history. + # @return [Boolean] True if equal. + def ==(other) + other.is_a?(WorkflowHistory) && events == other.events + end end end diff --git a/temporalio/sig/temporalio/internal/bridge/worker.rbs b/temporalio/sig/temporalio/internal/bridge/worker.rbs index 2701f8cd..610938ea 100644 --- a/temporalio/sig/temporalio/internal/bridge/worker.rbs +++ b/temporalio/sig/temporalio/internal/bridge/worker.rbs @@ -121,6 +121,12 @@ module Temporalio def replace_client: (Client client) -> void def initiate_shutdown: -> void + + class WorkflowReplayer + def self.new: (Runtime runtime, Options options) -> [WorkflowReplayer, Worker] + + def push_history: (String workflow_id, String proto) -> void + end end end end diff --git a/temporalio/sig/temporalio/internal/worker/multi_runner.rbs b/temporalio/sig/temporalio/internal/worker/multi_runner.rbs index 547e78f5..ddfa2cee 100644 --- a/temporalio/sig/temporalio/internal/worker/multi_runner.rbs +++ b/temporalio/sig/temporalio/internal/worker/multi_runner.rbs @@ -2,7 +2,14 @@ module Temporalio module Internal module Worker class MultiRunner - def initialize: (workers: Array[Temporalio::Worker], shutdown_signals: Array[String | Integer]) -> void + interface _Worker + def _bridge_worker: -> Bridge::Worker + def _initiate_shutdown: -> void + def _wait_all_complete: -> void + end + + + def initialize: (workers: Array[_Worker], shutdown_signals: Array[String | Integer]) -> void def apply_thread_or_fiber_block: ?{ (?) -> untyped } -> void @@ -27,12 +34,12 @@ module Temporalio class Event class PollSuccess < Event - attr_reader worker: Temporalio::Worker + attr_reader worker: _Worker attr_reader worker_type: Symbol attr_reader bytes: String def initialize: ( - worker: Temporalio::Worker, + worker: _Worker, worker_type: Symbol, bytes: String ) -> void @@ -73,23 +80,23 @@ module Temporalio end class PollFailure < Event - attr_reader worker: Temporalio::Worker + attr_reader worker: _Worker attr_reader worker_type: Symbol attr_reader error: Exception def initialize: ( - worker: Temporalio::Worker, + worker: _Worker, worker_type: Symbol, error: Exception ) -> void end class PollerShutDown < Event - attr_reader worker: Temporalio::Worker + attr_reader worker: _Worker attr_reader worker_type: Symbol def initialize: ( - worker: Temporalio::Worker, + worker: _Worker, worker_type: Symbol ) -> void end diff --git a/temporalio/sig/temporalio/internal/worker/workflow_worker.rbs b/temporalio/sig/temporalio/internal/worker/workflow_worker.rbs index 193d43df..eeb798f6 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_worker.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_worker.rbs @@ -6,10 +6,27 @@ module Temporalio Array[singleton(Workflow::Definition) | Workflow::Definition::Info] workflows ) -> Hash[String?, Workflow::Definition::Info] + def self.bridge_workflow_failure_exception_type_options: ( + workflow_failure_exception_types: Array[singleton(Exception)], + workflow_definitions: Hash[String?, Workflow::Definition::Info] + ) -> [bool, Array[String]] + def initialize: ( - worker: Temporalio::Worker, bridge_worker: Bridge::Worker, - workflow_definitions: Hash[String?, Workflow::Definition::Info] + namespace: String, + task_queue: String, + workflow_definitions: Hash[String?, Workflow::Definition::Info], + workflow_executor: Temporalio::Worker::WorkflowExecutor, + logger: Logger, + data_converter: Converters::DataConverter, + metric_meter: Temporalio::Metric::Meter, + workflow_interceptors: Array[Temporalio::Worker::Interceptor::Workflow], + disable_eager_activity_execution: bool, + illegal_workflow_calls: Hash[String, :all | Array[Symbol]], + workflow_failure_exception_types: Array[singleton(Exception)], + workflow_payload_codec_thread_pool: Temporalio::Worker::ThreadPool?, + debug_mode: bool, + ?on_eviction: (^(String run_id, untyped cache_remove_job) -> void)? ) -> void def handle_activation: ( @@ -45,6 +62,8 @@ module Temporalio attr_reader workflow_interceptors: Array[Temporalio::Worker::Interceptor::Workflow] attr_reader workflow_failure_exception_types: Array[singleton(Exception)] + attr_writer on_eviction: ^(String run_id, untyped cache_remove_job) -> void + def initialize: ( workflow_definitions: Hash[String?, Workflow::Definition::Info], bridge_worker: Bridge::Worker, @@ -61,7 +80,7 @@ module Temporalio ) -> void def get_or_create_running_workflow: [T] (String run_id) { -> T } -> T - def evict_running_workflow: (String run_id) -> void + def evict_running_workflow: (String run_id, untyped cache_remove_job) -> void def evict_all: -> void end end diff --git a/temporalio/sig/temporalio/worker.rbs b/temporalio/sig/temporalio/worker.rbs index 71cfdd60..9da961c7 100644 --- a/temporalio/sig/temporalio/worker.rbs +++ b/temporalio/sig/temporalio/worker.rbs @@ -122,12 +122,7 @@ module Temporalio def _wait_all_complete: -> void def _bridge_worker: -> Internal::Bridge::Worker def _activity_interceptors: -> Array[Interceptor::Activity] - def _workflow_interceptors: -> Array[Interceptor::Workflow] def _on_poll_bytes: (Internal::Worker::MultiRunner runner, Symbol worker_type, String bytes) -> void def _on_shutdown_complete: -> void - - private def to_bridge_slot_supplier_options: ( - Tuner::SlotSupplier slot_supplier - ) -> Internal::Bridge::Worker::TunerSlotSupplierOptions end end \ No newline at end of file diff --git a/temporalio/sig/temporalio/worker/tuner.rbs b/temporalio/sig/temporalio/worker/tuner.rbs index 22ee1666..23c08a63 100644 --- a/temporalio/sig/temporalio/worker/tuner.rbs +++ b/temporalio/sig/temporalio/worker/tuner.rbs @@ -17,6 +17,8 @@ module Temporalio slot_options: ResourceBasedSlotOptions ) -> void end + + def _to_bridge_options: -> Internal::Bridge::Worker::TunerSlotSupplierOptions end class ResourceBasedTunerOptions @@ -64,6 +66,8 @@ module Temporalio activity_slot_supplier: SlotSupplier, local_activity_slot_supplier: SlotSupplier ) -> void + + def _to_bridge_options: -> Internal::Bridge::Worker::TunerOptions end end end \ No newline at end of file diff --git a/temporalio/sig/temporalio/worker/workflow_executor.rbs b/temporalio/sig/temporalio/worker/workflow_executor.rbs index e8aada6c..4906bd2b 100644 --- a/temporalio/sig/temporalio/worker/workflow_executor.rbs +++ b/temporalio/sig/temporalio/worker/workflow_executor.rbs @@ -2,7 +2,7 @@ module Temporalio class Worker class WorkflowExecutor def _validate_worker: ( - Internal::Worker::WorkflowWorker worker, + Internal::Worker::WorkflowWorker workflow_worker, Internal::Worker::WorkflowWorker::State worker_state ) -> void diff --git a/temporalio/sig/temporalio/worker/workflow_executor/thread_pool.rbs b/temporalio/sig/temporalio/worker/workflow_executor/thread_pool.rbs index 409ca50b..c3e6d2ea 100644 --- a/temporalio/sig/temporalio/worker/workflow_executor/thread_pool.rbs +++ b/temporalio/sig/temporalio/worker/workflow_executor/thread_pool.rbs @@ -44,7 +44,8 @@ module Temporalio def evict: ( Internal::Worker::WorkflowWorker::State worker_state, - String run_id + String run_id, + untyped cache_remove_job ) -> void end diff --git a/temporalio/sig/temporalio/worker/workflow_replayer.rbs b/temporalio/sig/temporalio/worker/workflow_replayer.rbs new file mode 100644 index 00000000..93d056d5 --- /dev/null +++ b/temporalio/sig/temporalio/worker/workflow_replayer.rbs @@ -0,0 +1,96 @@ +module Temporalio + class Worker + class WorkflowReplayer + class Options + attr_reader workflows: Array[singleton(Workflow::Definition) | Workflow::Definition::Info] + attr_reader namespace: String + attr_reader task_queue: String + attr_reader data_converter: Converters::DataConverter + attr_reader workflow_executor: Worker::WorkflowExecutor + attr_reader interceptors: Array[Interceptor::Workflow] + attr_reader build_id: String + attr_reader identity: String? + attr_reader logger: Logger + attr_reader illegal_workflow_calls: Hash[String, :all | Array[Symbol]] + attr_reader workflow_failure_exception_types: Array[singleton(Exception)] + attr_reader workflow_payload_codec_thread_pool: ThreadPool? + attr_reader debug_mode: bool + attr_reader runtime: Runtime + + def initialize: ( + workflows: Array[singleton(Workflow::Definition) | Workflow::Definition::Info], + namespace: String, + task_queue: String, + data_converter: Converters::DataConverter, + workflow_executor: Worker::WorkflowExecutor, + interceptors: Array[Interceptor::Workflow], + build_id: String, + identity: String?, + logger: Logger, + illegal_workflow_calls: Hash[String, :all | Array[Symbol]], + workflow_failure_exception_types: Array[singleton(Exception)], + workflow_payload_codec_thread_pool: ThreadPool?, + debug_mode: bool, + runtime: Runtime + ) -> void + end + + attr_reader options: Options + + def initialize: ( + workflows: Array[singleton(Workflow::Definition) | Workflow::Definition::Info], + ?namespace: String, + ?task_queue: String, + ?data_converter: Converters::DataConverter, + ?workflow_executor: Worker::WorkflowExecutor, + ?interceptors: Array[Interceptor::Workflow], + ?build_id: String, + ?identity: String?, + ?logger: Logger, + ?illegal_workflow_calls: Hash[String, :all | Array[Symbol]], + ?workflow_failure_exception_types: Array[singleton(Exception)], + ?workflow_payload_codec_thread_pool: ThreadPool?, + ?debug_mode: bool, + ?runtime: Runtime + ) ?{ (ReplayWorker worker) -> untyped } -> void + + def replay_workflow: ( + WorkflowHistory history, + ?raise_on_replay_failure: bool + ) -> ReplayResult + + def replay_workflows: ( + Enumerable[WorkflowHistory] histories, + ?raise_on_replay_failure: bool + ) -> Array[ReplayResult] + + def with_replay_worker: [T] { (ReplayWorker worker) -> T } -> T + + class ReplayResult + attr_reader history: WorkflowHistory + attr_reader replay_failure: Exception? + + def initialize: (history: WorkflowHistory, replay_failure: Exception?) -> void + end + + class ReplayWorker + def initialize: ( + options: Options, + workflow_definitions: Hash[String?, Workflow::Definition::Info], + nondeterminism_as_workflow_fail: bool, + nondeterminism_as_workflow_fail_for_types: Array[String] + ) -> void + + def replay_workflow: ( + WorkflowHistory history, + ?raise_on_replay_failure: bool + ) -> ReplayResult + + def _shutdown: -> void + def _bridge_worker: -> Internal::Bridge::Worker + def _initiate_shutdown: -> void + def _wait_all_complete: -> void + end + end + end +end \ No newline at end of file diff --git a/temporalio/sig/temporalio/workflow_history.rbs b/temporalio/sig/temporalio/workflow_history.rbs index 43605ed4..0d2edb8b 100644 --- a/temporalio/sig/temporalio/workflow_history.rbs +++ b/temporalio/sig/temporalio/workflow_history.rbs @@ -1,7 +1,13 @@ module Temporalio class WorkflowHistory + def self.from_history_json: (String json) -> WorkflowHistory + attr_reader events: Array[untyped] def initialize: (Array[untyped] events) -> void + + def workflow_id: -> String + + def to_history_json: -> String end end \ No newline at end of file diff --git a/temporalio/test/worker/workflow_replayer_test.rb b/temporalio/test/worker/workflow_replayer_test.rb new file mode 100644 index 00000000..5ba6ee02 --- /dev/null +++ b/temporalio/test/worker/workflow_replayer_test.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +require 'temporalio/activity' +require 'temporalio/worker/workflow_replayer' +require 'temporalio/workflow' +require 'temporalio/workflow_history' +require 'test' + +module Worker + class WorkflowReplayerTest < Test + class SayHelloActivity < Temporalio::Activity::Definition + def execute(name) + "Hello, #{name}!" + end + end + + class SayHelloWorkflow < Temporalio::Workflow::Definition + workflow_query_attr_reader :waiting + + def execute(params) + result = Temporalio::Workflow.execute_activity( + SayHelloActivity, params['name'], + schedule_to_close_timeout: 10 + ) + + # Wait if requested + if params['should_hang'] + @waiting = true + Temporalio::Workflow.wait_condition { false } + end + + # Raise if requested + raise Temporalio::Error::ApplicationError, 'Intentional error' if params['should_error'] + raise 'Intentional task failure' if params['should_fail_task'] + + # Cause non-determinism if requested + if params['should_cause_non_determinism'] && Temporalio::Workflow::Unsafe.replaying? + Temporalio::Workflow.sleep(0.1) + end + + result + end + end + + def test_simple + # Run simple workflow to completion and get history + history = execute_workflow(SayHelloWorkflow, { name: 'Temporal' }, activities: [SayHelloActivity]) do |handle| + assert_equal 'Hello, Temporal!', handle.result + handle.fetch_history + end + + # Confirm conversion to/from json + history_json = history.to_history_json + assert_equal history, Temporalio::WorkflowHistory.from_history_json(history_json) + + # Replay history in various ways + assert_nil Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) + .replay_workflow(history) + .replay_failure + assert_nil Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) + .replay_workflows([history]) + .first #: Temporalio::Worker::WorkflowReplayer::ReplayResult + .replay_failure + assert_nil Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) + .replay_workflows(Enumerator.new { |y| y << history }) + .first #: Temporalio::Worker::WorkflowReplayer::ReplayResult + .replay_failure + Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) do |w| + assert_nil w.replay_workflow(history).replay_failure + end + assert_nil Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) + .with_replay_worker { |w| w.replay_workflow(history) } + .replay_failure + histories = env.client + .list_workflows("WorkflowId = '#{history.workflow_id}'") + .map { |e| env.client.workflow_handle(e.id).fetch_history } + assert_nil Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) + .replay_workflows(histories) + .first #: Temporalio::Worker::WorkflowReplayer::ReplayResult + .replay_failure + end + + def test_incomplete_run + # Start simple workflow and get history + history = execute_workflow( + SayHelloWorkflow, { name: 'Temporal', should_hang: true }, activities: [SayHelloActivity] + ) do |handle| + # Wait until "waiting" to get history + assert_eventually { assert handle.query(SayHelloWorkflow.waiting) } + handle.fetch_history + end + # Replay + assert_nil Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) + .replay_workflow(history) + .replay_failure + end + + def test_failed_run + # Run to failure and get history + history = execute_workflow( + SayHelloWorkflow, { name: 'Temporal', should_error: true }, activities: [SayHelloActivity] + ) do |handle| + assert_raises(Temporalio::Error::WorkflowFailedError) { handle.result } + handle.fetch_history + end + # Replay + assert_nil Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) + .replay_workflow(history) + .replay_failure + end + + def test_non_deterministic_run + # Run to completion and get history + history = execute_workflow( + SayHelloWorkflow, { name: 'Temporal', should_cause_non_determinism: true }, activities: [SayHelloActivity] + ) do |handle| + assert_equal 'Hello, Temporal!', handle.result + handle.fetch_history + end + + # Confirm replay raises non-determinism + assert_raises(Temporalio::Workflow::NondeterminismError) do + Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]).replay_workflow(history) + end + + # And returns if not asked to raise + assert_instance_of Temporalio::Workflow::NondeterminismError, + Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) + .replay_workflow(history, raise_on_replay_failure: false) + .replay_failure + end + + def test_task_failure + # Run to failure and get history + history = execute_workflow( + SayHelloWorkflow, { name: 'Temporal', should_fail_task: true }, activities: [SayHelloActivity] + ) do |handle| + assert_eventually_task_fail(handle:) + handle.fetch_history + end + # Replay + assert_nil Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) + .replay_workflow(history) + .replay_failure + end + + def test_multiple_histories + # Run simple workflow to completion and get history + history1 = execute_workflow(SayHelloWorkflow, { name: 'Temporal' }, activities: [SayHelloActivity]) do |handle| + assert_equal 'Hello, Temporal!', handle.result + handle.fetch_history + end + # Run non-deterministic to completion and get history + history2 = execute_workflow( + SayHelloWorkflow, { name: 'Temporal', should_cause_non_determinism: true }, activities: [SayHelloActivity] + ) do |handle| + assert_equal 'Hello, Temporal!', handle.result + handle.fetch_history + end + results = Temporalio::Worker::WorkflowReplayer.new(workflows: [SayHelloWorkflow]) + .replay_workflows([history1, history2]) + assert_equal 2, results.size + assert_nil results.first&.replay_failure # steep:ignore + assert_instance_of Temporalio::Workflow::NondeterminismError, results.last&.replay_failure + end + end +end