Skip to content

Commit

Permalink
Dynamic activities (#198)
Browse files Browse the repository at this point in the history
Fixes #166
  • Loading branch information
cretz authored Jan 17, 2025
1 parent a9034b8 commit 350951a
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 23 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,11 @@ Some notes about activity definition:
"Activity Concurrency and Executors" section later for more details.
* Technically an activity definition can be created manually via `Temporalio::Activity::Definition::Info.new` that
accepts a proc or a block, but the class form is recommended.
* `activity_dynamic` can be used to mark an activity dynamic. Dynamic activities do not have names and handle any
activity that is not otherwise registered. A worker can only have one dynamic activity.
* `workflow_raw_args` can be used to have activity arguments delivered to `execute` as
`Temporalio::Converters::RawValue`s. These are wrappers for the raw payloads that have not been converted to types
(but they have been decoded by the codec if present). They can be converted with `payload_converter` on the context.

#### Activity Context

Expand Down
52 changes: 43 additions & 9 deletions temporalio/lib/temporalio/activity/definition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,47 @@ def activity_executor(executor_name)
# @param cancel_raise [Boolean] Whether to raise.
def activity_cancel_raise(cancel_raise)
unless cancel_raise.is_a?(TrueClass) || cancel_raise.is_a?(FalseClass)
raise ArgumentError,
'Must be a boolean'
raise ArgumentError, 'Must be a boolean'
end

@activity_cancel_raise = cancel_raise
end

# Set an activity as dynamic. Dynamic activities do not have names and handle any activity that is not otherwise
# registered. A worker can only have one dynamic activity. It is often useful to use {activity_raw_args} with
# this.
#
# @param value [Boolean] Whether the activity is dynamic.
def activity_dynamic(value = true) # rubocop:disable Style/OptionalBooleanParameter
raise ArgumentError, 'Must be a boolean' unless value.is_a?(TrueClass) || value.is_a?(FalseClass)

@activity_dynamic = value
end

# Have activity arguments delivered to `execute` as {Converters::RawValue}s. These are wrappers for the raw
# payloads that have not been converted to types (but they have been decoded by the codec if present). They can
# be converted with {Context#payload_converter}.
#
# @param value [Boolean] Whether the activity accepts raw arguments.
def activity_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParameter
raise ArgumentError, 'Must be a boolean' unless value.is_a?(TrueClass) || value.is_a?(FalseClass)

@activity_raw_args = value
end
end

# @!visibility private
def self._activity_definition_details
activity_name = @activity_name
raise 'Cannot have activity name specified for dynamic activity' if activity_name && @activity_dynamic

# Default to unqualified class name if not dynamic
activity_name ||= name.to_s.split('::').last unless @activity_dynamic
{
activity_name: @activity_name || name.to_s.split('::').last,
activity_name:,
activity_executor: @activity_executor || :default,
activity_cancel_raise: @activity_cancel_raise.nil? ? true : @activity_cancel_raise
activity_cancel_raise: @activity_cancel_raise.nil? ? true : @activity_cancel_raise,
activity_raw_args: @activity_raw_args.nil? ? false : @activity_raw_args
}
end

Expand All @@ -75,7 +102,7 @@ def execute(*args)
# Definition info of an activity. Activities are usually classes/instances that extend {Definition}, but
# definitions can also be manually created with a block via {initialize} here.
class Info
# @return [String, Symbol] Name of the activity.
# @return [String, Symbol, nil] Name of the activity, or nil if the activity is dynamic.
attr_reader :name

# @return [Proc] Proc for the activity.
Expand All @@ -87,6 +114,9 @@ class Info
# @return [Boolean] Whether to raise in thread/fiber on cancellation. Default is `true`.
attr_reader :cancel_raise

# @return [Boolean] Whether to use {Converters::RawValue}s as arguments.
attr_reader :raw_args

# Obtain definition info representing the given activity, which can be a class, instance, or definition info.
#
# @param activity [Definition, Class<Definition>, Info] Activity to get info for.
Expand All @@ -105,14 +135,16 @@ def self.from_activity(activity)
new(
name: details[:activity_name],
executor: details[:activity_executor],
cancel_raise: details[:activity_cancel_raise]
cancel_raise: details[:activity_cancel_raise],
raw_args: details[:activity_raw_args]
) { |*args| activity.new.execute(*args) } # Instantiate and call
when Definition
details = activity.class._activity_definition_details
new(
name: details[:activity_name],
executor: details[:activity_executor],
cancel_raise: details[:activity_cancel_raise]
cancel_raise: details[:activity_cancel_raise],
raw_args: details[:activity_raw_args]
) { |*args| activity.execute(*args) } # Just and call
when Info
activity
Expand All @@ -123,17 +155,19 @@ def self.from_activity(activity)

# Manually create activity definition info. Most users will use an instance/class of {Definition}.
#
# @param name [String, Symbol] Name of the activity.
# @param name [String, Symbol, nil] Name of the activity or nil for dynamic activity.
# @param executor [Symbol] Name of the executor.
# @param cancel_raise [Boolean] Whether to raise in thread/fiber on cancellation.
# @param raw_args [Boolean] Whether to use {Converters::RawValue}s as arguments.
# @yield Use this block as the activity.
def initialize(name:, executor: :default, cancel_raise: true, &block)
def initialize(name:, executor: :default, cancel_raise: true, raw_args: false, &block)
@name = name
raise ArgumentError, 'Must give block' unless block_given?

@proc = block
@executor = executor
@cancel_raise = cancel_raise
@raw_args = raw_args
end
end
end
Expand Down
21 changes: 14 additions & 7 deletions temporalio/lib/temporalio/internal/worker/activity_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'temporalio/activity'
require 'temporalio/activity/definition'
require 'temporalio/cancellation'
require 'temporalio/converters/raw_value'
require 'temporalio/internal/bridge/api'
require 'temporalio/internal/proto_utils'
require 'temporalio/scoped_logger'
Expand All @@ -29,12 +30,13 @@ def initialize(worker:, bridge_worker:)
Activity::Context.current_or_nil&._scoped_logger_info
}

# Build up activity hash by name, failing if any fail validation
# Build up activity hash by name (can be nil for dynamic), failing if any fail validation
@activities = worker.options.activities.each_with_object({}) do |act, hash|
# Class means create each time, instance means just call, definition
# does nothing special
defn = Activity::Definition::Info.from_activity(act)
# Confirm name not in use
raise ArgumentError, 'Only one dynamic activity allowed' if !defn.name && hash.key?(defn.name)
raise ArgumentError, "Multiple activities named #{defn.name}" if hash.key?(defn.name)

# Confirm executor is a known executor and let it initialize
Expand Down Expand Up @@ -91,8 +93,8 @@ def handle_task(task)
def handle_start_task(task_token, start)
set_running_activity(task_token, nil)

# Find activity definition
defn = @activities[start.activity_type]
# Find activity definition, falling back to dynamic if present
defn = @activities[start.activity_type] || @activities[nil]
if defn.nil?
raise Error::ApplicationError.new(
"Activity #{start.activity_type} for workflow #{start.workflow_execution.workflow_id} " \
Expand Down Expand Up @@ -185,10 +187,15 @@ def execute_activity(task_token, defn, start)
# Build input
input = Temporalio::Worker::Interceptor::Activity::ExecuteInput.new(
proc: defn.proc,
args: ProtoUtils.convert_from_payload_array(
@worker.options.client.data_converter,
start.input.to_ary
),
# If the activity wants raw_args, we only decode we don't convert
args: if defn.raw_args
payloads = start.input.to_ary
codec = @worker.options.client.data_converter.payload_codec
payloads = codec.decode(payloads) if codec
payloads.map { |p| Temporalio::Converters::RawValue.new(p) }
else
ProtoUtils.convert_from_payload_array(@worker.options.client.data_converter, start.input.to_ary)
end,
headers: ProtoUtils.headers_from_proto_map(start.header_fields, @worker.options.client.data_converter) || {}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def execute_activity(input)
else
raise ArgumentError, 'Activity must be a definition class, or a symbol/string'
end
raise 'Cannot invoke dynamic activities' unless activity_type

execute_activity_with_local_backoffs(local: false, cancellation: input.cancellation) do
seq = (@activity_counter += 1)
@instance.add_command(
Expand Down Expand Up @@ -102,6 +104,8 @@ def execute_local_activity(input)
else
raise ArgumentError, 'Activity must be a definition class, or a symbol/string'
end
raise 'Cannot invoke dynamic activities' unless activity_type

execute_activity_with_local_backoffs(local: true, cancellation: input.cancellation) do |do_backoff|
seq = (@activity_counter += 1)
@instance.add_command(
Expand Down
4 changes: 2 additions & 2 deletions temporalio/lib/temporalio/workflow/definition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def workflow_dynamic(value = true) # rubocop:disable Style/OptionalBooleanParame
end

# Have workflow arguments delivered to `execute` (and `initialize` if {workflow_init} in use) as
# {Converters::RawValue}s. These are wrappers for the raw payloads that have not been decoded. They can be
# decoded with {Workflow.payload_converter}.
# {Converters::RawValue}s. These are wrappers for the raw payloads that have not been converted to types (but
# they have been decoded by the codec if present). They can be converted with {Workflow.payload_converter}.
#
# @param value [Boolean] Whether the workflow accepts raw arguments.
def workflow_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParameter
Expand Down
15 changes: 10 additions & 5 deletions temporalio/sig/temporalio/activity/definition.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,32 @@ module Temporalio
def self.activity_name: (String | Symbol name) -> void
def self.activity_executor: (Symbol executor_name) -> void
def self.activity_cancel_raise: (bool cancel_raise) -> void
def self.activity_dynamic: (?bool value) -> void
def self.activity_raw_args: (?bool value) -> void

def self._activity_definition_details: -> {
activity_name: String | Symbol,
activity_name: String | Symbol | nil,
activity_executor: Symbol,
activity_cancel_raise: bool
activity_cancel_raise: bool,
activity_raw_args: bool
}

def execute: (*untyped) -> untyped

class Info
attr_reader name: String | Symbol
attr_reader name: String | Symbol | nil
attr_reader proc: Proc
attr_reader executor: Symbol
attr_reader cancel_raise: bool
attr_reader raw_args: bool

def self.from_activity: (Definition | singleton(Definition) | Info activity) -> Info

def initialize: (
name: String | Symbol,
name: String | Symbol | nil,
?executor: Symbol,
?cancel_raise: bool
?cancel_raise: bool,
?raw_args: bool
) { (?) -> untyped } -> void
end
end
Expand Down
35 changes: 35 additions & 0 deletions temporalio/test/worker_activity_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,41 @@ def test_interceptor_from_client
assert_equal ['heartbeat-val'], interceptor.calls[2][1].details
end

class DynamicActivity < Temporalio::Activity::Definition
activity_dynamic

def execute(*args)
"Activity #{Temporalio::Activity::Context.current.info.activity_type} called with #{args}"
end
end

def test_dynamic_activity
assert_equal 'Activity does-not-exist called with ["arg1", 123]',
execute_activity(DynamicActivity, 'arg1', 123, override_name: 'does-not-exist')
end

class DynamicActivityRawArgs < Temporalio::Activity::Definition
activity_dynamic
activity_raw_args

def execute(*args)
metadata_encodings, decoded_args = args.map do |arg|
raise 'Bad type' unless arg.is_a?(Temporalio::Converters::RawValue)

[arg.payload.metadata['encoding'],
Temporalio::Activity::Context.current.payload_converter.from_payload(arg.payload)]
end.transpose
"Activity #{Temporalio::Activity::Context.current.info.activity_type} called with " \
"#{decoded_args} that have encodings #{metadata_encodings}"
end
end

def test_dynamic_activity_raw_args
assert_equal 'Activity does-not-exist called with ' \
'["arg1", nil, 123] that have encodings ["json/plain", "binary/null", "json/plain"]',
execute_activity(DynamicActivityRawArgs, 'arg1', nil, 123, override_name: 'does-not-exist')
end

# steep:ignore
def execute_activity(
activity,
Expand Down

0 comments on commit 350951a

Please sign in to comment.