Skip to content

Commit

Permalink
Cloud test and support for updating API keys and RPC metadata (#197)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jan 21, 2025
1 parent 350951a commit ef62a52
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 5 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,22 @@ jobs:
working-directory: ./temporalio
# Timeout just in case there's a hanging part in rake
timeout-minutes: 20
# Set env vars for cloud tests. If secrets aren't present, tests will be skipped.
env:
# For mTLS tests
TEMPORAL_CLOUD_MTLS_TEST_TARGET_HOST: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
TEMPORAL_CLOUD_MTLS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLOUD_MTLS_TEST_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
TEMPORAL_CLOUD_MTLS_TEST_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}

# For API key tests
TEMPORAL_CLOUD_API_KEY_TEST_TARGET_HOST: us-west-2.aws.api.temporal.io:7233
TEMPORAL_CLOUD_API_KEY_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLOUD_API_KEY_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}

# For cloud ops tests
TEMPORAL_CLOUD_OPS_TEST_TARGET_HOST: saas-api.tmprl.cloud:443
TEMPORAL_CLOUD_OPS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLOUD_OPS_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
TEMPORAL_CLOUD_OPS_TEST_API_VERSION: 2024-05-13-00
run: bundle exec rake TESTOPTS="--verbose"
20 changes: 18 additions & 2 deletions temporalio/ext/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
class.const_set("SERVICE_HEALTH", SERVICE_HEALTH)?;
class.define_singleton_method("async_new", function!(Client::async_new, 3))?;
class.define_method("async_invoke_rpc", method!(Client::async_invoke_rpc, -1))?;
class.define_method("update_metadata", method!(Client::update_metadata, 1))?;
class.define_method("update_api_key", method!(Client::update_api_key, 1))?;

let inner_class = class.define_error("RPCFailure", ruby.get_inner(&ROOT_ERR))?;
inner_class.define_method("code", method!(RpcFailure::code, 0))?;
Expand Down Expand Up @@ -83,10 +85,16 @@ impl Client {
pub fn async_new(runtime: &Runtime, options: Struct, queue: Value) -> Result<(), Error> {
// Build options
let mut opts_build = ClientOptionsBuilder::default();
let tls = options.child(id!("tls"))?;
opts_build
.target_url(
Url::parse(
format!("http://{}", options.member::<String>(id!("target_host"))?).as_str(),
format!(
"{}://{}",
if tls.is_some() { "https" } else { "http" },
options.member::<String>(id!("target_host"))?
)
.as_str(),
)
.map_err(|err| error!("Failed parsing host: {}", err))?,
)
Expand All @@ -95,7 +103,7 @@ impl Client {
.headers(Some(options.member(id!("rpc_metadata"))?))
.api_key(options.member(id!("api_key"))?)
.identity(options.member(id!("identity"))?);
if let Some(tls) = options.child(id!("tls"))? {
if let Some(tls) = tls {
opts_build.tls_cfg(TlsConfig {
client_tls_config: match (
tls.member::<Option<RString>>(id!("client_cert"))?,
Expand Down Expand Up @@ -223,6 +231,14 @@ impl Client {
let callback = AsyncCallback::from_queue(queue);
self.invoke_rpc(service, callback, call)
}

pub fn update_metadata(&self, headers: HashMap<String, String>) {
self.core.get_client().set_headers(headers);
}

pub fn update_api_key(&self, api_key: Option<String>) {
self.core.get_client().set_api_key(api_key);
}
}

#[derive(DataTypeFunctions, TypedData)]
Expand Down
50 changes: 47 additions & 3 deletions temporalio/lib/temporalio/client/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,16 @@ class Options; end # rubocop:disable Lint/EmptyClass
# @!attribute domain
# @return [String, nil] SNI override. This is only needed for self-hosted servers with certificates that do not
# match the hostname being connected to.
class TLSOptions; end # rubocop:disable Lint/EmptyClass
class TLSOptions
def initialize(
client_cert: nil,
client_private_key: nil,
server_root_ca_cert: nil,
domain: nil
)
super
end
end

RPCRetryOptions = Data.define(
:initial_interval,
Expand Down Expand Up @@ -122,7 +131,9 @@ def initialize(interval: 30.0, timeout: 15.0)
# @return [String, nil] Pass for HTTP basic auth for the proxy, must be combined with {basic_auth_user}.
class HTTPConnectProxyOptions; end # rubocop:disable Lint/EmptyClass

# @return [Options] Frozen options for this client which has the same attributes as {initialize}.
# @return [Options] Frozen options for this client which has the same attributes as {initialize}. Note that if
# {api_key=} or {rpc_metadata=} are updated, the options object is replaced with those changes (it is not
# mutated in place).
attr_reader :options

# @return [WorkflowService] Raw gRPC workflow service.
Expand Down Expand Up @@ -183,6 +194,7 @@ def initialize(
lazy_connect:
).freeze
# Create core client now if not lazy
@core_client_mutex = Mutex.new
_core_client unless lazy_connect
# Create service instances
@workflow_service = WorkflowService.new(self)
Expand All @@ -206,11 +218,43 @@ def connected?
!@core_client.nil?
end

# @return [String, nil] API key. This is a shortcut for `options.api_key`.
def api_key
@options.api_key
end

# Set the API key for all future calls. This also makes a new object for {options} with the changes.
#
# @param new_key [String, nil] New API key.
def api_key=(new_key)
# Mutate the client if connected then mutate options
@core_client_mutex.synchronize do
@core_client&.update_api_key(new_key)
@options = @options.with(api_key: new_key)
end
end

# @return [Hash<String, String>] RPC metadata (aka HTTP headers). This is a shortcut for `options.rpc_metadata`.
def rpc_metadata
@options.rpc_metadata
end

# Set the RPC metadata (aka HTTP headers) for all future calls. This also makes a new object for {options} with
# the changes.
#
# @param rpc_metadata [Hash<String, String>] New API key.
def rpc_metadata=(rpc_metadata)
# Mutate the client if connected then mutate options
@core_client_mutex.synchronize do
@core_client&.update_metadata(rpc_metadata)
@options = @options.with(rpc_metadata: rpc_metadata)
end
end

# @!visibility private
def _core_client
# If lazy, this needs to be done under mutex
if @options.lazy_connect
@core_client_mutex ||= Mutex.new
@core_client_mutex.synchronize do
@core_client ||= new_core_client
end
Expand Down
4 changes: 4 additions & 0 deletions temporalio/sig/temporalio/client/connection.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ module Temporalio
def target_host: -> String
def identity: -> String
def connected?: -> bool
def api_key: -> String?
def api_key=: (String? new_key) -> void
def rpc_metadata: -> Hash[String, String]
def rpc_metadata=: (Hash[String, String] rpc_metadata) -> void
def _core_client: -> Internal::Bridge::Client
private def new_core_client: -> Internal::Bridge::Client
end
Expand Down
3 changes: 3 additions & 0 deletions temporalio/sig/temporalio/internal/bridge/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ module Temporalio
queue: Queue
) -> void

def update_metadata: (Hash[String, String]) -> void
def update_api_key: (String?) -> void

class RPCFailure < Error
def code: -> Temporalio::Error::RPCError::Code::enum
def message: -> String
Expand Down
73 changes: 73 additions & 0 deletions temporalio/test/client_cloud_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

require 'securerandom'
require 'temporalio/api'
require 'temporalio/client'
require 'test'

class ClientCloudTest < Test
class SimpleWorkflow < Temporalio::Workflow::Definition
def execute(name)
"Hello, #{name}!"
end
end

def test_mtls
client_private_key = ENV.fetch('TEMPORAL_CLOUD_MTLS_TEST_CLIENT_KEY', '')
skip('No cloud mTLS key') if client_private_key.empty?

client = Temporalio::Client.connect(
ENV.fetch('TEMPORAL_CLOUD_MTLS_TEST_TARGET_HOST'),
ENV.fetch('TEMPORAL_CLOUD_MTLS_TEST_NAMESPACE'),
tls: Temporalio::Client::Connection::TLSOptions.new(
client_cert: ENV.fetch('TEMPORAL_CLOUD_MTLS_TEST_CLIENT_CERT'),
client_private_key:
)
)
assert_equal 'Hello, Temporal!', execute_workflow(SimpleWorkflow, 'Temporal', client:)
end

def test_api_key
api_key = ENV.fetch('TEMPORAL_CLOUD_API_KEY_TEST_API_KEY', '')
skip('No cloud API key') if api_key.empty?

client = Temporalio::Client.connect(
ENV.fetch('TEMPORAL_CLOUD_API_KEY_TEST_TARGET_HOST'),
ENV.fetch('TEMPORAL_CLOUD_API_KEY_TEST_NAMESPACE'),
api_key:,
tls: true,
rpc_metadata: { 'temporal-namespace' => ENV.fetch('TEMPORAL_CLOUD_API_KEY_TEST_NAMESPACE') }
)
# Run workflow
id = "wf-#{SecureRandom.uuid}"
assert_equal 'Hello, Temporal!', execute_workflow(SimpleWorkflow, 'Temporal', id:, client:)
handle = client.workflow_handle(id)

# Confirm it can be described
assert_equal 'SimpleWorkflow', handle.describe.workflow_type

# Change API and confirm failure
client.connection.api_key = 'wrong'
assert_raises(Temporalio::Error::RPCError) { handle.describe.workflow_type }
end

def test_cloud_ops
api_key = ENV.fetch('TEMPORAL_CLOUD_OPS_TEST_API_KEY', '')
skip('No cloud API key') if api_key.empty?

# Create connection
conn = Temporalio::Client::Connection.new(
target_host: ENV.fetch('TEMPORAL_CLOUD_OPS_TEST_TARGET_HOST'),
api_key:,
tls: true,
rpc_metadata: { 'temporal-cloud-api-version' => ENV.fetch('TEMPORAL_CLOUD_OPS_TEST_API_VERSION') }
)

# Simple call
namespace = ENV.fetch('TEMPORAL_CLOUD_OPS_TEST_NAMESPACE')
res = conn.cloud_service.get_namespace(
Temporalio::Api::Cloud::CloudService::V1::GetNamespaceRequest.new(namespace:)
)
assert_equal namespace, res.namespace.namespace
end
end

0 comments on commit ef62a52

Please sign in to comment.