diff --git a/packages/common/src/internal-non-workflow/codec-helpers.ts b/packages/common/src/internal-non-workflow/codec-helpers.ts index 5cba37ec1..eefb6f7d8 100644 --- a/packages/common/src/internal-non-workflow/codec-helpers.ts +++ b/packages/common/src/internal-non-workflow/codec-helpers.ts @@ -357,6 +357,17 @@ export function noopEncodeMap( return map as Record | null | undefined; } +export function noopEncodeSearchAttrs( + attrs: temporal.api.common.v1.ISearchAttributes | null | undefined +): temporal.api.common.v1.ISearchAttributes | null | undefined { + if (!attrs) { + return attrs; + } + return { + indexedFields: noopEncodeMap(attrs.indexedFields), + }; +} + /** * Mark all values in the map as decoded. * Use this for headers, which we don't encode. diff --git a/packages/core-bridge/Cargo.lock b/packages/core-bridge/Cargo.lock index 483aebeb8..4022778a6 100644 --- a/packages/core-bridge/Cargo.lock +++ b/packages/core-bridge/Cargo.lock @@ -372,6 +372,27 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "dirs" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.61.2", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -1368,7 +1389,7 @@ dependencies = [ "futures-sink", "js-sys", "pin-project-lite", - "thiserror 2.0.18", + "thiserror", "tracing", ] @@ -1398,7 +1419,7 @@ dependencies = [ "opentelemetry_sdk", "prost", "reqwest", - "thiserror 2.0.18", + "thiserror", "tokio", "tonic", "tracing", @@ -1429,11 +1450,17 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand 0.9.2", - "thiserror 2.0.18", + "thiserror", "tokio", "tokio-stream", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "os_pipe" version = "1.2.3" @@ -1467,6 +1494,43 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "pbjson" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8edd1efdd8ab23ba9cb9ace3d9987a72663d5d7c9f74fa00b51d6213645cf6c" +dependencies = [ + "base64", + "serde", +] + +[[package]] +name = "pbjson-build" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ed4d5c6ae95e08ac768883c8401cf0e8deb4e6e1d6a4e1fd3d2ec4f0ec63200" +dependencies = [ + "heck", + "itertools", + "prost", + "prost-types", +] + +[[package]] +name = "pbjson-types" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a14e2757d877c0f607a82ce1b8560e224370f159d66c5d52eb55ea187ef0350e" +dependencies = [ + "bytes", + "chrono", + "pbjson", + "pbjson-build", + "prost", + "prost-build", + "serde", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -1620,8 +1684,7 @@ dependencies = [ "lazy_static", "memchr", "parking_lot", - "protobuf", - "thiserror 2.0.18", + "thiserror", ] [[package]] @@ -1723,26 +1786,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "protobuf" -version = "3.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" -dependencies = [ - "once_cell", - "protobuf-support", - "thiserror 1.0.69", -] - -[[package]] -name = "protobuf-support" -version = "3.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" -dependencies = [ - "thiserror 1.0.69", -] - [[package]] name = "pulldown-cmark" version = "0.13.0" @@ -1792,7 +1835,7 @@ dependencies = [ "rustc-hash", "rustls", "socket2", - "thiserror 2.0.18", + "thiserror", "tokio", "tracing", "web-time", @@ -1813,7 +1856,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror 2.0.18", + "thiserror", "tinyvec", "tracing", "web-time", @@ -1934,6 +1977,17 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_users" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" +dependencies = [ + "getrandom 0.2.17", + "libredox", + "thiserror", +] + [[package]] name = "regex" version = "1.12.3" @@ -2211,6 +2265,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_spanned" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2406,7 +2469,7 @@ dependencies = [ "temporalio-client", "temporalio-common", "temporalio-sdk-core", - "thiserror 2.0.18", + "thiserror", "tokio", "tokio-stream", "tonic", @@ -2436,7 +2499,7 @@ dependencies = [ "rand 0.9.2", "slotmap", "temporalio-common", - "thiserror 2.0.18", + "thiserror", "tokio", "tonic", "tower", @@ -2453,20 +2516,40 @@ dependencies = [ "async-trait", "base64", "bon", + "crc32fast", "derive_more", + "dirs", + "erased-serde", + "futures", + "futures-channel", + "http-body-util", + "hyper", + "hyper-util", "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", + "parking_lot", + "pbjson", + "pbjson-build", + "pbjson-types", + "prometheus", "prost", + "prost-types", "prost-wkt", "prost-wkt-types", "rand 0.9.2", + "ringbuf", "serde", "serde_json", - "thiserror 2.0.18", + "thiserror", + "tokio", + "toml", "tonic", "tonic-prost", "tonic-prost-build", "tracing", "tracing-core", + "tracing-subscriber", "url", "uuid", ] @@ -2497,13 +2580,10 @@ dependencies = [ "enum-iterator", "enum_dispatch", "flate2", - "futures-channel", + "futures", "futures-util", "gethostname", "governor", - "http-body-util", - "hyper", - "hyper-util", "itertools", "lru", "mockall", @@ -2513,12 +2593,10 @@ dependencies = [ "parking_lot", "pid", "pin-project", - "prometheus", "prost", "prost-wkt-types", "rand 0.9.2", "reqwest", - "ringbuf", "serde", "serde_json", "siphasher", @@ -2528,13 +2606,12 @@ dependencies = [ "temporalio-client", "temporalio-common", "temporalio-macros", - "thiserror 2.0.18", + "thiserror", "tokio", "tokio-stream", "tokio-util", "tonic", "tracing", - "tracing-subscriber", "url", "uuid", "zip", @@ -2546,33 +2623,13 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" -[[package]] -name = "thiserror" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" -dependencies = [ - "thiserror-impl 1.0.69", -] - [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl 2.0.18", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "thiserror-impl", ] [[package]] @@ -2682,6 +2739,45 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.9.12+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf92845e79fc2e2def6a5d828f0801e29a2f8acc037becc5ab08595c7d5e9863" +dependencies = [ + "indexmap", + "serde_core", + "serde_spanned", + "toml_datetime", + "toml_parser", + "toml_writer", + "winnow", +] + +[[package]] +name = "toml_datetime" +version = "0.7.5+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" +dependencies = [ + "serde_core", +] + +[[package]] +name = "toml_parser" +version = "1.0.9+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "702d4415e08923e7e1ef96cd5727c0dfed80b4d2fa25db9647fe5eb6f7c5a4c4" +dependencies = [ + "winnow", +] + +[[package]] +name = "toml_writer" +version = "1.0.6+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" + [[package]] name = "tonic" version = "0.14.3" @@ -3411,6 +3507,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winnow" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829" + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/packages/core-bridge/sdk-core b/packages/core-bridge/sdk-core index 12c656f7b..050776728 160000 --- a/packages/core-bridge/sdk-core +++ b/packages/core-bridge/sdk-core @@ -1 +1 @@ -Subproject commit 12c656f7b2dce2a139bb3f1b4873c7007a751380 +Subproject commit 05077672867822042737bc44bd9b0c9aa6aee993 diff --git a/packages/core-bridge/src/client.rs b/packages/core-bridge/src/client.rs index 385a303d8..55fcfc1d3 100644 --- a/packages/core-bridge/src/client.rs +++ b/packages/core-bridge/src/client.rs @@ -5,10 +5,10 @@ use std::{collections::HashMap, sync::Arc}; use neon::prelude::*; use tonic::metadata::{BinaryMetadataValue, MetadataKey}; -use temporalio_sdk_core::{ClientOptions as CoreClientOptions, CoreRuntime, RetryClient}; +use temporalio_sdk_core::CoreRuntime; use bridge_macros::{TryFromJs, js_function}; -use temporalio_client::{ClientInitError, ConfiguredClient, TemporalServiceClient}; +use temporalio_client::{Connection, errors::ClientConnectError}; use crate::runtime::Runtime; use crate::{helpers::*, runtime::RuntimeExt as _}; @@ -38,12 +38,10 @@ pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult< Ok(()) } -type CoreClient = RetryClient>; - pub struct Client { // These fields are pub because they are accessed from Worker::new pub(crate) core_runtime: Arc, - pub(crate) core_client: CoreClient, + pub(crate) core_connection: Connection, } /// Create a connected gRPC client which can be used to initialize workers. @@ -53,33 +51,32 @@ pub fn client_new( config: config::ClientOptions, ) -> BridgeResult>> { let runtime = runtime.borrow()?.core_runtime.clone(); - let config: CoreClientOptions = config.try_into()?; + let metric_meter = runtime.telemetry().get_temporal_metric_meter(); + let options = config.into_connection_options(metric_meter); runtime.clone().future_to_promise(async move { - let metric_meter = runtime.clone().telemetry().get_temporal_metric_meter(); - let res = config.connect_no_namespace(metric_meter).await; - - let core_client = match res { - Ok(core_client) => core_client, - Err(ClientInitError::InvalidHeaders(e)) => Err(BridgeError::TypeError { + let core_connection = match Connection::connect(options).await { + Ok(conn) => conn, + Err(ClientConnectError::InvalidHeaders(e)) => Err(BridgeError::TypeError { message: format!("Invalid metadata key: {e}"), field: None, })?, - Err(ClientInitError::SystemInfoCallError(e)) => Err(BridgeError::TransportError( + Err(ClientConnectError::SystemInfoCallError(e)) => Err(BridgeError::TransportError( format!("Failed to call GetSystemInfo: {e}"), ))?, - Err(ClientInitError::TonicTransportError(e)) => { + Err(ClientConnectError::TonicTransportError(e)) => { Err(BridgeError::TransportError(format!("{e:?}")))? } - Err(ClientInitError::InvalidUri(e)) => Err(BridgeError::TypeError { + Err(ClientConnectError::InvalidUri(e)) => Err(BridgeError::TypeError { message: e.to_string(), field: None, })?, + Err(e) => Err(BridgeError::TransportError(format!("{e:?}")))?, }; Ok(OpaqueOutboundHandle::new(Client { core_runtime: runtime, - core_client, + core_connection, })) }) } @@ -93,8 +90,7 @@ pub fn client_update_headers( let (ascii_headers, bin_headers) = config::partition_headers(Some(headers)); client .borrow()? - .core_client - .get_client() + .core_connection .set_headers(ascii_headers.unwrap_or_default()) .map_err(|err| BridgeError::TypeError { message: format!("Invalid metadata key: {err}"), @@ -102,8 +98,7 @@ pub fn client_update_headers( })?; client .borrow()? - .core_client - .get_client() + .core_connection .set_binary_headers(bin_headers.unwrap_or_default()) .map_err(|err| BridgeError::TypeError { message: format!("Invalid metadata key: {err}"), @@ -115,11 +110,7 @@ pub fn client_update_headers( /// Update a Client's API key #[js_function] pub fn client_update_api_key(client: OpaqueInboundHandle, key: String) -> BridgeResult<()> { - client - .borrow()? - .core_client - .get_client() - .set_api_key(Some(key)); + client.borrow()?.core_connection.set_api_key(Some(key)); Ok(()) } @@ -158,11 +149,12 @@ pub fn client_send_workflow_service_request( ) -> BridgeResult>> { let client = client.borrow()?; let core_runtime = client.core_runtime.clone(); - let core_client = client.core_client.clone(); + let core_connection = client.core_connection.clone(); // FIXME: "large future with a size of 18560 bytes" - core_runtime - .future_to_promise(async move { client_invoke_workflow_service(core_client, call).await }) + core_runtime.future_to_promise(async move { + client_invoke_workflow_service(core_connection, call).await + }) } /// Send a request to the Operator Service using the provided Client @@ -173,10 +165,11 @@ pub fn client_send_operator_service_request( ) -> BridgeResult>> { let client = client.borrow()?; let core_runtime = client.core_runtime.clone(); - let core_client = client.core_client.clone(); + let core_connection = client.core_connection.clone(); - core_runtime - .future_to_promise(async move { client_invoke_operator_service(core_client, call).await }) + core_runtime.future_to_promise(async move { + client_invoke_operator_service(core_connection, call).await + }) } /// Send a request to the Test Service using the provided Client @@ -187,10 +180,10 @@ pub fn client_send_test_service_request( ) -> BridgeResult>> { let client = client.borrow()?; let core_runtime = client.core_runtime.clone(); - let core_client = client.core_client.clone(); + let core_connection = client.core_connection.clone(); core_runtime - .future_to_promise(async move { client_invoke_test_service(core_client, call).await }) + .future_to_promise(async move { client_invoke_test_service(core_connection, call).await }) } /// Send a request to the Health Service using the provided Client @@ -201,10 +194,10 @@ pub fn client_send_health_service_request( ) -> BridgeResult>> { let client = client.borrow()?; let core_runtime = client.core_runtime.clone(); - let core_client = client.core_client.clone(); + let core_connection = client.core_connection.clone(); core_runtime - .future_to_promise(async move { client_invoke_health_service(core_client, call).await }) + .future_to_promise(async move { client_invoke_health_service(core_connection, call).await }) } /// Indicates that a gRPC request failed @@ -240,11 +233,19 @@ impl TryIntoJs for tonic::Status { } macro_rules! rpc_call { - ($retry_client:ident, $call:ident, $call_name:ident) => { + ($connection:ident, $call:ident, $call_name:ident) => { + rpc_call!($connection, $call, $call_name, workflow_service) + }; + ($connection:ident, $call:ident, $call_name:ident, $service_accessor:ident) => { if $call.retry { - rpc_resp($retry_client.$call_name(rpc_req($call)?).await) + rpc_resp($connection.$call_name(rpc_req($call)?).await) } else { - rpc_resp($retry_client.into_inner().$call_name(rpc_req($call)?).await) + rpc_resp( + $connection + .$service_accessor() + .$call_name(rpc_req($call)?) + .await, + ) } }; } @@ -254,297 +255,166 @@ macro_rules! rpc_call { #[allow(clippy::large_stack_frames)] #[allow(clippy::too_many_lines)] async fn client_invoke_workflow_service( - mut retry_client: CoreClient, + mut connection: Connection, call: RpcCall, ) -> BridgeResult> { - use temporalio_client::WorkflowService; + use temporalio_client::grpc::WorkflowService; match call.rpc.as_str() { - "CountActivityExecutions" => { - rpc_call!(retry_client, call, count_activity_executions) - } - "CountWorkflowExecutions" => { - rpc_call!(retry_client, call, count_workflow_executions) - } - "CreateSchedule" => { - rpc_call!(retry_client, call, create_schedule) - } - "CreateWorkflowRule" => { - rpc_call!(retry_client, call, create_workflow_rule) - } - "DeleteActivityExecution" => { - rpc_call!(retry_client, call, delete_activity_execution) - } - "DeleteSchedule" => { - rpc_call!(retry_client, call, delete_schedule) - } - "DeleteWorkerDeployment" => { - rpc_call!(retry_client, call, delete_worker_deployment) - } + "CountActivityExecutions" => rpc_call!(connection, call, count_activity_executions), + "CountSchedules" => rpc_call!(connection, call, count_schedules), + "CountWorkflowExecutions" => rpc_call!(connection, call, count_workflow_executions), + "CreateSchedule" => rpc_call!(connection, call, create_schedule), + "CreateWorkflowRule" => rpc_call!(connection, call, create_workflow_rule), + "DeleteActivityExecution" => rpc_call!(connection, call, delete_activity_execution), + "DeleteSchedule" => rpc_call!(connection, call, delete_schedule), + "DeleteWorkerDeployment" => rpc_call!(connection, call, delete_worker_deployment), "DeleteWorkerDeploymentVersion" => { - rpc_call!(retry_client, call, delete_worker_deployment_version) - } - "DeleteWorkflowExecution" => { - rpc_call!(retry_client, call, delete_workflow_execution) - } - "DeleteWorkflowRule" => { - rpc_call!(retry_client, call, delete_workflow_rule) - } - "DescribeBatchOperation" => { - rpc_call!(retry_client, call, describe_batch_operation) - } - "DescribeActivityExecution" => { - rpc_call!(retry_client, call, describe_activity_execution) - } - "DescribeDeployment" => { - rpc_call!(retry_client, call, describe_deployment) - } - "DescribeWorker" => { - rpc_call!(retry_client, call, describe_worker) - } - "DeprecateNamespace" => rpc_call!(retry_client, call, deprecate_namespace), - "DescribeNamespace" => rpc_call!(retry_client, call, describe_namespace), - "DescribeSchedule" => rpc_call!(retry_client, call, describe_schedule), - "DescribeTaskQueue" => rpc_call!(retry_client, call, describe_task_queue), - "DescribeWorkerDeployment" => { - rpc_call!(retry_client, call, describe_worker_deployment) - } + rpc_call!(connection, call, delete_worker_deployment_version) + } + "DeleteWorkflowExecution" => rpc_call!(connection, call, delete_workflow_execution), + "DeleteWorkflowRule" => rpc_call!(connection, call, delete_workflow_rule), + "DescribeBatchOperation" => rpc_call!(connection, call, describe_batch_operation), + "DescribeActivityExecution" => rpc_call!(connection, call, describe_activity_execution), + "DescribeDeployment" => rpc_call!(connection, call, describe_deployment), + "DescribeWorker" => rpc_call!(connection, call, describe_worker), + "DeprecateNamespace" => rpc_call!(connection, call, deprecate_namespace), + "DescribeNamespace" => rpc_call!(connection, call, describe_namespace), + "DescribeSchedule" => rpc_call!(connection, call, describe_schedule), + "DescribeTaskQueue" => rpc_call!(connection, call, describe_task_queue), + "DescribeWorkerDeployment" => rpc_call!(connection, call, describe_worker_deployment), "DescribeWorkerDeploymentVersion" => { - rpc_call!(retry_client, call, describe_worker_deployment_version) - } - "DescribeWorkflowExecution" => { - rpc_call!(retry_client, call, describe_workflow_execution) - } - "DescribeWorkflowRule" => { - rpc_call!(retry_client, call, describe_workflow_rule) - } - "ExecuteMultiOperation" => rpc_call!(retry_client, call, execute_multi_operation), - "FetchWorkerConfig" => rpc_call!(retry_client, call, fetch_worker_config), - "GetClusterInfo" => rpc_call!(retry_client, call, get_cluster_info), - "GetCurrentDeployment" => rpc_call!(retry_client, call, get_current_deployment), - "GetDeploymentReachability" => { - rpc_call!(retry_client, call, get_deployment_reachability) - } - "GetSearchAttributes" => { - rpc_call!(retry_client, call, get_search_attributes) - } - "GetSystemInfo" => rpc_call!(retry_client, call, get_system_info), + rpc_call!(connection, call, describe_worker_deployment_version) + } + "DescribeWorkflowExecution" => rpc_call!(connection, call, describe_workflow_execution), + "DescribeWorkflowRule" => rpc_call!(connection, call, describe_workflow_rule), + "ExecuteMultiOperation" => rpc_call!(connection, call, execute_multi_operation), + "FetchWorkerConfig" => rpc_call!(connection, call, fetch_worker_config), + "GetClusterInfo" => rpc_call!(connection, call, get_cluster_info), + "GetCurrentDeployment" => rpc_call!(connection, call, get_current_deployment), + "GetDeploymentReachability" => rpc_call!(connection, call, get_deployment_reachability), + "GetSearchAttributes" => rpc_call!(connection, call, get_search_attributes), + "GetSystemInfo" => rpc_call!(connection, call, get_system_info), "GetWorkerBuildIdCompatibility" => { - rpc_call!(retry_client, call, get_worker_build_id_compatibility) - } - "GetWorkerTaskReachability" => { - rpc_call!(retry_client, call, get_worker_task_reachability) - } - "GetWorkerVersioningRules" => { - rpc_call!(retry_client, call, get_worker_versioning_rules) + rpc_call!(connection, call, get_worker_build_id_compatibility) } + "GetWorkerTaskReachability" => rpc_call!(connection, call, get_worker_task_reachability), + "GetWorkerVersioningRules" => rpc_call!(connection, call, get_worker_versioning_rules), "GetWorkflowExecutionHistory" => { - rpc_call!(retry_client, call, get_workflow_execution_history) + rpc_call!(connection, call, get_workflow_execution_history) } "GetWorkflowExecutionHistoryReverse" => { - rpc_call!(retry_client, call, get_workflow_execution_history_reverse) + rpc_call!(connection, call, get_workflow_execution_history_reverse) } "ListArchivedWorkflowExecutions" => { - rpc_call!(retry_client, call, list_archived_workflow_executions) - } - "ListActivityExecutions" => { - rpc_call!(retry_client, call, list_activity_executions) - } - "ListBatchOperations" => { - rpc_call!(retry_client, call, list_batch_operations) + rpc_call!(connection, call, list_archived_workflow_executions) } + "ListActivityExecutions" => rpc_call!(connection, call, list_activity_executions), + "ListBatchOperations" => rpc_call!(connection, call, list_batch_operations), "ListClosedWorkflowExecutions" => { - rpc_call!(retry_client, call, list_closed_workflow_executions) - } - "ListDeployments" => { - rpc_call!(retry_client, call, list_deployments) - } - "ListNamespaces" => rpc_call!(retry_client, call, list_namespaces), - "ListOpenWorkflowExecutions" => { - rpc_call!(retry_client, call, list_open_workflow_executions) - } - "ListScheduleMatchingTimes" => { - rpc_call!(retry_client, call, list_schedule_matching_times) - } - "ListSchedules" => { - rpc_call!(retry_client, call, list_schedules) - } - "ListTaskQueuePartitions" => { - rpc_call!(retry_client, call, list_task_queue_partitions) - } - "ListWorkerDeployments" => { - rpc_call!(retry_client, call, list_worker_deployments) - } - "ListWorkers" => { - rpc_call!(retry_client, call, list_workers) - } - "ListWorkflowExecutions" => { - rpc_call!(retry_client, call, list_workflow_executions) - } - "ListWorkflowRules" => { - rpc_call!(retry_client, call, list_workflow_rules) - } - "PatchSchedule" => { - rpc_call!(retry_client, call, patch_schedule) - } - "PauseActivity" => { - rpc_call!(retry_client, call, pause_activity) - } - "PauseWorkflowExecution" => { - rpc_call!(retry_client, call, pause_workflow_execution) - } - "PollActivityExecution" => { - rpc_call!(retry_client, call, poll_activity_execution) - } - "PollActivityTaskQueue" => { - rpc_call!(retry_client, call, poll_activity_task_queue) - } - "PollNexusTaskQueue" => rpc_call!(retry_client, call, poll_nexus_task_queue), + rpc_call!(connection, call, list_closed_workflow_executions) + } + "ListDeployments" => rpc_call!(connection, call, list_deployments), + "ListNamespaces" => rpc_call!(connection, call, list_namespaces), + "ListOpenWorkflowExecutions" => rpc_call!(connection, call, list_open_workflow_executions), + "ListScheduleMatchingTimes" => rpc_call!(connection, call, list_schedule_matching_times), + "ListSchedules" => rpc_call!(connection, call, list_schedules), + "ListTaskQueuePartitions" => rpc_call!(connection, call, list_task_queue_partitions), + "ListWorkerDeployments" => rpc_call!(connection, call, list_worker_deployments), + "ListWorkers" => rpc_call!(connection, call, list_workers), + "ListWorkflowExecutions" => rpc_call!(connection, call, list_workflow_executions), + "ListWorkflowRules" => rpc_call!(connection, call, list_workflow_rules), + "PatchSchedule" => rpc_call!(connection, call, patch_schedule), + "PauseActivity" => rpc_call!(connection, call, pause_activity), + "PauseWorkflowExecution" => rpc_call!(connection, call, pause_workflow_execution), + "PollActivityExecution" => rpc_call!(connection, call, poll_activity_execution), + "PollActivityTaskQueue" => rpc_call!(connection, call, poll_activity_task_queue), + "PollNexusTaskQueue" => rpc_call!(connection, call, poll_nexus_task_queue), "PollWorkflowExecutionUpdate" => { - rpc_call!(retry_client, call, poll_workflow_execution_update) + rpc_call!(connection, call, poll_workflow_execution_update) } - "PollWorkflowTaskQueue" => { - rpc_call!(retry_client, call, poll_workflow_task_queue) - } - "QueryWorkflow" => rpc_call!(retry_client, call, query_workflow), + "PollWorkflowTaskQueue" => rpc_call!(connection, call, poll_workflow_task_queue), + "QueryWorkflow" => rpc_call!(connection, call, query_workflow), "RecordActivityTaskHeartbeat" => { - rpc_call!(retry_client, call, record_activity_task_heartbeat) + rpc_call!(connection, call, record_activity_task_heartbeat) } "RecordActivityTaskHeartbeatById" => { - rpc_call!(retry_client, call, record_activity_task_heartbeat_by_id) - } - "RecordWorkerHeartbeat" => { - rpc_call!(retry_client, call, record_worker_heartbeat) + rpc_call!(connection, call, record_activity_task_heartbeat_by_id) } - "RegisterNamespace" => rpc_call!(retry_client, call, register_namespace), + "RecordWorkerHeartbeat" => rpc_call!(connection, call, record_worker_heartbeat), + "RegisterNamespace" => rpc_call!(connection, call, register_namespace), "RequestCancelActivityExecution" => { - rpc_call!(retry_client, call, request_cancel_activity_execution) + rpc_call!(connection, call, request_cancel_activity_execution) } "RequestCancelWorkflowExecution" => { - rpc_call!(retry_client, call, request_cancel_workflow_execution) - } - "ResetActivity" => { - rpc_call!(retry_client, call, reset_activity) - } - "ResetStickyTaskQueue" => { - rpc_call!(retry_client, call, reset_sticky_task_queue) - } - "ResetWorkflowExecution" => { - rpc_call!(retry_client, call, reset_workflow_execution) + rpc_call!(connection, call, request_cancel_workflow_execution) } + "ResetActivity" => rpc_call!(connection, call, reset_activity), + "ResetStickyTaskQueue" => rpc_call!(connection, call, reset_sticky_task_queue), + "ResetWorkflowExecution" => rpc_call!(connection, call, reset_workflow_execution), "RespondActivityTaskCanceled" => { - rpc_call!(retry_client, call, respond_activity_task_canceled) + rpc_call!(connection, call, respond_activity_task_canceled) } "RespondActivityTaskCanceledById" => { - rpc_call!(retry_client, call, respond_activity_task_canceled_by_id) + rpc_call!(connection, call, respond_activity_task_canceled_by_id) } "RespondActivityTaskCompleted" => { - rpc_call!(retry_client, call, respond_activity_task_completed) + rpc_call!(connection, call, respond_activity_task_completed) } "RespondActivityTaskCompletedById" => { - rpc_call!(retry_client, call, respond_activity_task_completed_by_id) - } - "RespondActivityTaskFailed" => { - rpc_call!(retry_client, call, respond_activity_task_failed) + rpc_call!(connection, call, respond_activity_task_completed_by_id) } + "RespondActivityTaskFailed" => rpc_call!(connection, call, respond_activity_task_failed), "RespondActivityTaskFailedById" => { - rpc_call!(retry_client, call, respond_activity_task_failed_by_id) - } - "RespondNexusTaskCompleted" => { - rpc_call!(retry_client, call, respond_nexus_task_completed) - } - "RespondNexusTaskFailed" => { - rpc_call!(retry_client, call, respond_nexus_task_failed) - } - "RespondQueryTaskCompleted" => { - rpc_call!(retry_client, call, respond_query_task_completed) + rpc_call!(connection, call, respond_activity_task_failed_by_id) } + "RespondNexusTaskCompleted" => rpc_call!(connection, call, respond_nexus_task_completed), + "RespondNexusTaskFailed" => rpc_call!(connection, call, respond_nexus_task_failed), + "RespondQueryTaskCompleted" => rpc_call!(connection, call, respond_query_task_completed), "RespondWorkflowTaskCompleted" => { - rpc_call!(retry_client, call, respond_workflow_task_completed) - } - "RespondWorkflowTaskFailed" => { - rpc_call!(retry_client, call, respond_workflow_task_failed) - } - "ScanWorkflowExecutions" => { - rpc_call!(retry_client, call, scan_workflow_executions) - } - "SetCurrentDeployment" => { - rpc_call!(retry_client, call, set_current_deployment) + rpc_call!(connection, call, respond_workflow_task_completed) } + "RespondWorkflowTaskFailed" => rpc_call!(connection, call, respond_workflow_task_failed), + "ScanWorkflowExecutions" => rpc_call!(connection, call, scan_workflow_executions), + "SetCurrentDeployment" => rpc_call!(connection, call, set_current_deployment), "SetWorkerDeploymentCurrentVersion" => { - rpc_call!(retry_client, call, set_worker_deployment_current_version) - } - "SetWorkerDeploymentManager" => { - rpc_call!(retry_client, call, set_worker_deployment_manager) + rpc_call!(connection, call, set_worker_deployment_current_version) } + "SetWorkerDeploymentManager" => rpc_call!(connection, call, set_worker_deployment_manager), "SetWorkerDeploymentRampingVersion" => { - rpc_call!(retry_client, call, set_worker_deployment_ramping_version) - } - "ShutdownWorker" => { - rpc_call!(retry_client, call, shutdown_worker) + rpc_call!(connection, call, set_worker_deployment_ramping_version) } + "ShutdownWorker" => rpc_call!(connection, call, shutdown_worker), "SignalWithStartWorkflowExecution" => { - rpc_call!(retry_client, call, signal_with_start_workflow_execution) - } - "SignalWorkflowExecution" => { - rpc_call!(retry_client, call, signal_workflow_execution) - } - "StartActivityExecution" => { - rpc_call!(retry_client, call, start_activity_execution) - } - "StartWorkflowExecution" => { - rpc_call!(retry_client, call, start_workflow_execution) - } - "StartBatchOperation" => { - rpc_call!(retry_client, call, start_batch_operation) - } - "StopBatchOperation" => { - rpc_call!(retry_client, call, stop_batch_operation) - } - "TerminateActivityExecution" => { - rpc_call!(retry_client, call, terminate_activity_execution) - } - "TerminateWorkflowExecution" => { - rpc_call!(retry_client, call, terminate_workflow_execution) - } - "TriggerWorkflowRule" => { - rpc_call!(retry_client, call, trigger_workflow_rule) - } - "UnpauseActivity" => { - rpc_call!(retry_client, call, unpause_activity) - } - "UnpauseWorkflowExecution" => { - rpc_call!(retry_client, call, unpause_workflow_execution) - } - "UpdateActivityOptions" => { - rpc_call!(retry_client, call, update_activity_options) - } - "UpdateNamespace" => { - rpc_call!(retry_client, call, update_namespace) - } - "UpdateSchedule" => rpc_call!(retry_client, call, update_schedule), - "UpdateWorkerConfig" => rpc_call!(retry_client, call, update_worker_config), + rpc_call!(connection, call, signal_with_start_workflow_execution) + } + "SignalWorkflowExecution" => rpc_call!(connection, call, signal_workflow_execution), + "StartActivityExecution" => rpc_call!(connection, call, start_activity_execution), + "StartWorkflowExecution" => rpc_call!(connection, call, start_workflow_execution), + "StartBatchOperation" => rpc_call!(connection, call, start_batch_operation), + "StopBatchOperation" => rpc_call!(connection, call, stop_batch_operation), + "TerminateActivityExecution" => rpc_call!(connection, call, terminate_activity_execution), + "TerminateWorkflowExecution" => rpc_call!(connection, call, terminate_workflow_execution), + "TriggerWorkflowRule" => rpc_call!(connection, call, trigger_workflow_rule), + "UnpauseActivity" => rpc_call!(connection, call, unpause_activity), + "UnpauseWorkflowExecution" => rpc_call!(connection, call, unpause_workflow_execution), + "UpdateActivityOptions" => rpc_call!(connection, call, update_activity_options), + "UpdateNamespace" => rpc_call!(connection, call, update_namespace), + "UpdateSchedule" => rpc_call!(connection, call, update_schedule), + "UpdateWorkerConfig" => rpc_call!(connection, call, update_worker_config), "UpdateWorkerDeploymentVersionMetadata" => { - rpc_call!( - retry_client, - call, - update_worker_deployment_version_metadata - ) - } - "UpdateTaskQueueConfig" => { - rpc_call!(retry_client, call, update_task_queue_config) - } - "UpdateWorkflowExecution" => { - rpc_call!(retry_client, call, update_workflow_execution) + rpc_call!(connection, call, update_worker_deployment_version_metadata) } + "UpdateTaskQueueConfig" => rpc_call!(connection, call, update_task_queue_config), + "UpdateWorkflowExecution" => rpc_call!(connection, call, update_workflow_execution), "UpdateWorkflowExecutionOptions" => { - rpc_call!(retry_client, call, update_workflow_execution_options) + rpc_call!(connection, call, update_workflow_execution_options) } "UpdateWorkerBuildIdCompatibility" => { - rpc_call!(retry_client, call, update_worker_build_id_compatibility) + rpc_call!(connection, call, update_worker_build_id_compatibility) } "UpdateWorkerVersioningRules" => { - rpc_call!(retry_client, call, update_worker_versioning_rules) + rpc_call!(connection, call, update_worker_versioning_rules) } _ => Err(BridgeError::TypeError { field: None, @@ -555,36 +425,47 @@ async fn client_invoke_workflow_service( #[allow(clippy::cognitive_complexity)] async fn client_invoke_operator_service( - mut retry_client: CoreClient, + mut connection: Connection, call: RpcCall, ) -> BridgeResult> { - use temporalio_client::OperatorService; + use temporalio_client::grpc::OperatorService; match call.rpc.as_str() { "AddOrUpdateRemoteCluster" => { - rpc_call!(retry_client, call, add_or_update_remote_cluster) + rpc_call!( + connection, + call, + add_or_update_remote_cluster, + operator_service + ) } "AddSearchAttributes" => { - rpc_call!(retry_client, call, add_search_attributes) + rpc_call!(connection, call, add_search_attributes, operator_service) + } + "CreateNexusEndpoint" => { + rpc_call!(connection, call, create_nexus_endpoint, operator_service) } - "CreateNexusEndpoint" => rpc_call!(retry_client, call, create_nexus_endpoint), "DeleteNamespace" => { - rpc_call!(retry_client, call, delete_namespace) + rpc_call!(connection, call, delete_namespace, operator_service) + } + "DeleteNexusEndpoint" => { + rpc_call!(connection, call, delete_nexus_endpoint, operator_service) } - "DeleteNexusEndpoint" => rpc_call!(retry_client, call, delete_nexus_endpoint), - "GetNexusEndpoint" => rpc_call!(retry_client, call, get_nexus_endpoint), - "ListClusters" => rpc_call!(retry_client, call, list_clusters), - "ListNexusEndpoints" => rpc_call!(retry_client, call, list_nexus_endpoints), + "GetNexusEndpoint" => rpc_call!(connection, call, get_nexus_endpoint, operator_service), + "ListClusters" => rpc_call!(connection, call, list_clusters, operator_service), + "ListNexusEndpoints" => rpc_call!(connection, call, list_nexus_endpoints, operator_service), "ListSearchAttributes" => { - rpc_call!(retry_client, call, list_search_attributes) + rpc_call!(connection, call, list_search_attributes, operator_service) } "RemoveRemoteCluster" => { - rpc_call!(retry_client, call, remove_remote_cluster) + rpc_call!(connection, call, remove_remote_cluster, operator_service) } "RemoveSearchAttributes" => { - rpc_call!(retry_client, call, remove_search_attributes) + rpc_call!(connection, call, remove_search_attributes, operator_service) + } + "UpdateNexusEndpoint" => { + rpc_call!(connection, call, update_nexus_endpoint, operator_service) } - "UpdateNexusEndpoint" => rpc_call!(retry_client, call, update_nexus_endpoint), _ => Err(BridgeError::TypeError { field: None, message: format!("Unknown RPC call {}", call.rpc), @@ -593,20 +474,25 @@ async fn client_invoke_operator_service( } async fn client_invoke_test_service( - mut retry_client: CoreClient, + mut connection: Connection, call: RpcCall, ) -> BridgeResult> { - use temporalio_client::TestService; + use temporalio_client::grpc::TestService; match call.rpc.as_str() { - "GetCurrentTime" => rpc_call!(retry_client, call, get_current_time), - "LockTimeSkipping" => rpc_call!(retry_client, call, lock_time_skipping), - "SleepUntil" => rpc_call!(retry_client, call, sleep_until), - "Sleep" => rpc_call!(retry_client, call, sleep), + "GetCurrentTime" => rpc_call!(connection, call, get_current_time, test_service), + "LockTimeSkipping" => rpc_call!(connection, call, lock_time_skipping, test_service), + "SleepUntil" => rpc_call!(connection, call, sleep_until, test_service), + "Sleep" => rpc_call!(connection, call, sleep, test_service), "UnlockTimeSkippingWithSleep" => { - rpc_call!(retry_client, call, unlock_time_skipping_with_sleep) + rpc_call!( + connection, + call, + unlock_time_skipping_with_sleep, + test_service + ) } - "UnlockTimeSkipping" => rpc_call!(retry_client, call, unlock_time_skipping), + "UnlockTimeSkipping" => rpc_call!(connection, call, unlock_time_skipping, test_service), _ => Err(BridgeError::TypeError { field: None, message: format!("Unknown RPC call {}", call.rpc), @@ -615,13 +501,13 @@ async fn client_invoke_test_service( } async fn client_invoke_health_service( - mut retry_client: CoreClient, + mut connection: Connection, call: RpcCall, ) -> BridgeResult> { - use temporalio_client::HealthService; + use temporalio_client::grpc::HealthService; match call.rpc.as_str() { - "Check" => rpc_call!(retry_client, call, check), + "Check" => rpc_call!(connection, call, check, health_service), // Intentionally ignore 'watch' because it's a streaming method, which is not currently // supported by the macro and client-side code, and not needed anyway for any SDK use case. _ => Err(BridgeError::TypeError { @@ -686,15 +572,16 @@ where mod config { use std::collections::HashMap; - use temporalio_client::HttpConnectProxyOptions; - use temporalio_sdk_core::{ - ClientOptions as CoreClientOptions, ClientTlsOptions as CoreClientTlsOptions, - TlsOptions as CoreTlsOptions, Url, + use temporalio_client::{ + ClientTlsOptions as CoreClientTlsOptions, ConnectionOptions, HttpConnectProxyOptions, + TlsOptions as CoreTlsOptions, }; + use temporalio_common::telemetry::metrics::TemporalMeter; + use temporalio_sdk_core::Url; use bridge_macros::TryFromJs; - use crate::{client::MetadataValue, helpers::*}; + use crate::client::MetadataValue; #[derive(Debug, Clone, TryFromJs)] pub(super) struct ClientOptions { @@ -734,13 +621,14 @@ mod config { password: String, } - impl TryInto for ClientOptions { - type Error = BridgeError; - fn try_into(self) -> Result { + impl ClientOptions { + pub(super) fn into_connection_options( + self, + metrics_meter: Option, + ) -> ConnectionOptions { let (ascii_headers, bin_headers) = partition_headers(self.headers); - let client_options = CoreClientOptions::builder() - .target_url(self.target_url) + ConnectionOptions::new(self.target_url) .client_name(self.client_name) .client_version(self.client_version) .maybe_tls_options(self.tls.map(Into::into)) @@ -748,15 +636,14 @@ mod config { .maybe_headers(ascii_headers) .maybe_binary_headers(bin_headers) .maybe_api_key(self.api_key) + .maybe_metrics_meter(metrics_meter) .disable_error_code_metric_tags(self.disable_error_code_metric_tags) // identity -- skipped: will be set on worker // retry_config -- skipped: worker overrides anyway // override_origin -- skipped: will default to tls_cfg.domain // keep_alive -- skipped: defaults to true; is there any reason to disable this? // skip_get_system_info -- skipped: defaults to false; is there any reason to set this? - .build(); - - Ok(client_options) + .build() } } diff --git a/packages/core-bridge/src/metrics.rs b/packages/core-bridge/src/metrics.rs index 182cac487..e0648775e 100644 --- a/packages/core-bridge/src/metrics.rs +++ b/packages/core-bridge/src/metrics.rs @@ -5,18 +5,15 @@ use std::sync::Arc; use neon::prelude::*; use serde::Deserialize; -use temporalio_common::telemetry::metrics::{ - BufferInstrumentRef as CoreBufferInstrumentRef, CoreMeter, Counter as CoreCounter, - CustomMetricAttributes, Gauge as CoreGauge, Histogram as CoreHistogram, MetricCallBufferer, - MetricEvent as CoreMetricEvent, MetricKind as CoreMetricKind, - MetricParameters as CoreMetricParameters, NewAttributes, TemporalMeter, +use temporalio_common::telemetry::metrics::core::{ + BufferInstrumentRef as CoreBufferInstrumentRef, CustomMetricAttributes, MetricCallBufferer, + MetricEvent as CoreMetricEvent, MetricKind as CoreMetricKind, MetricUpdateVal, }; use temporalio_common::telemetry::metrics::{ - GaugeF64 as CoreGaugeF64, HistogramF64 as CoreHistogramF64, -}; -use temporalio_common::telemetry::{ - metrics, - metrics::{MetricKeyValue as CoreMetricKeyValue, MetricValue as CoreMetricValue}, + Counter as CoreCounter, Gauge as CoreGauge, GaugeF64 as CoreGaugeF64, + Histogram as CoreHistogram, HistogramF64 as CoreHistogramF64, + MetricKeyValue as CoreMetricKeyValue, MetricParameters as CoreMetricParameters, + MetricValue as CoreMetricValue, NewAttributes, TemporalMeter, }; use bridge_macros::{TryIntoJs, js_function}; @@ -126,7 +123,7 @@ pub fn new_metric_counter( "Failed to get metric meter".into(), ))?; - let counter = meter.inner.counter( + let counter = meter.counter( CoreMetricParameters::builder() .name(name) .unit(unit) @@ -152,7 +149,7 @@ pub fn new_metric_histogram( "Failed to get metric meter".into(), ))?; - let histogram = meter.inner.histogram( + let histogram = meter.histogram( CoreMetricParameters::builder() .name(name) .unit(unit) @@ -178,7 +175,7 @@ pub fn new_metric_histogram_f64( "Failed to get metric meter".into(), ))?; - let histogram = meter.inner.histogram_f64( + let histogram = meter.histogram_f64( CoreMetricParameters::builder() .name(name) .unit(unit) @@ -204,7 +201,7 @@ pub fn new_metric_gauge( "Failed to get metric meter".into(), ))?; - let gauge = meter.inner.gauge( + let gauge = meter.gauge( CoreMetricParameters::builder() .name(name) .unit(unit) @@ -230,7 +227,7 @@ pub fn new_metric_gauge_f64( "Failed to get metric meter".into(), ))?; - let gauge = meter.inner.gauge_f64( + let gauge = meter.gauge_f64( CoreMetricParameters::builder() .name(name) .unit(unit) @@ -254,7 +251,6 @@ pub fn add_metric_counter_value( let attributes = counter_handle .meter - .inner .new_attributes(parse_metric_attributes(attributes.value)); counter_handle.counter.add(value as u64, &attributes); @@ -270,7 +266,6 @@ pub fn record_metric_histogram_value( let histogram_handle = histogram_handle.borrow()?; let attributes = histogram_handle .meter - .inner .new_attributes(parse_metric_attributes(attributes.value)); histogram_handle.histogram.record(value, &attributes); Ok(()) @@ -285,7 +280,6 @@ pub fn record_metric_histogram_f64_value( let histogram_handle = histogram_handle.borrow()?; let attributes = histogram_handle .meter - .inner .new_attributes(parse_metric_attributes(attributes.value)); histogram_handle.histogram.record(value, &attributes); Ok(()) @@ -300,7 +294,6 @@ pub fn set_metric_gauge_value( let gauge_handle = gauge_handle.borrow()?; let attributes = gauge_handle .meter - .inner .new_attributes(parse_metric_attributes(attributes.value)); gauge_handle.gauge.record(value, &attributes); Ok(()) @@ -315,7 +308,6 @@ pub fn set_metric_gauge_f64_value( let gauge_handle = gauge_handle.borrow()?; let attributes = gauge_handle .meter - .inner .new_attributes(parse_metric_attributes(attributes.value)); gauge_handle.gauge.record(value, &attributes); Ok(()) @@ -400,14 +392,14 @@ impl MetricsCallBuffer { metric: instrument.get().as_ref().clone(), #[allow(clippy::match_same_arms, clippy::cast_precision_loss)] value: match update { - metrics::MetricUpdateVal::Duration(v) if self.use_seconds_for_durations => { + MetricUpdateVal::Duration(v) if self.use_seconds_for_durations => { v.as_secs_f64() } - metrics::MetricUpdateVal::Duration(v) => v.as_millis() as f64, - metrics::MetricUpdateVal::Delta(v) => *v as f64, - metrics::MetricUpdateVal::DeltaF64(v) => *v, - metrics::MetricUpdateVal::Value(v) => *v as f64, - metrics::MetricUpdateVal::ValueF64(v) => *v, + MetricUpdateVal::Duration(v) => v.as_millis() as f64, + MetricUpdateVal::Delta(v) => *v as f64, + MetricUpdateVal::DeltaF64(v) => *v, + MetricUpdateVal::Value(v) => *v as f64, + MetricUpdateVal::ValueF64(v) => *v, }, attributes: attributes .get() @@ -508,10 +500,10 @@ impl TryIntoJs for BufferedMetricAttributes { let k = kv.key.as_str(); #[allow(clippy::cast_precision_loss)] match &kv.value { - metrics::MetricValue::String(v) => object.set_property_from(cx, k, v.as_str()), - metrics::MetricValue::Int(v) => object.set_property_from(cx, k, *v as f64), - metrics::MetricValue::Float(v) => object.set_property_from(cx, k, *v), - metrics::MetricValue::Bool(v) => object.set_property_from(cx, k, *v), + CoreMetricValue::String(v) => object.set_property_from(cx, k, v.as_str()), + CoreMetricValue::Int(v) => object.set_property_from(cx, k, *v as f64), + CoreMetricValue::Float(v) => object.set_property_from(cx, k, *v), + CoreMetricValue::Bool(v) => object.set_property_from(cx, k, *v), }?; } diff --git a/packages/core-bridge/src/runtime.rs b/packages/core-bridge/src/runtime.rs index 5eb10f07b..3f7414607 100644 --- a/packages/core-bridge/src/runtime.rs +++ b/packages/core-bridge/src/runtime.rs @@ -9,10 +9,8 @@ use temporalio_common::telemetry::{ CoreLog, OtelCollectorOptions as CoreOtelCollectorOptions, PrometheusExporterOptions as CorePrometheusExporterOptions, metrics::CoreMeter, }; -use temporalio_sdk_core::{ - CoreRuntime, TokioRuntimeBuilder, - telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter}, -}; +use temporalio_common::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter}; +use temporalio_sdk_core::{CoreRuntime, TokioRuntimeBuilder}; use bridge_macros::js_function; use tokio_stream::StreamExt as _; @@ -287,13 +285,14 @@ mod config { use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use neon::prelude::*; + use temporalio_common::telemetry::CoreLogStreamConsumer; use temporalio_common::telemetry::{ HistogramBucketOverrides, Logger as CoreTelemetryLogger, MetricTemporality, OtelCollectorOptions as CoreOtelCollectorOptions, OtlpProtocol, PrometheusExporterOptions as CorePrometheusExporterOptions, TelemetryOptions as CoreTelemetryOptions, }; - use temporalio_sdk_core::{Url, telemetry::CoreLogStreamConsumer}; + use temporalio_sdk_core::Url; use bridge_macros::TryFromJs; diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 7cf7c7a3a..85e844d1e 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -6,10 +6,6 @@ use prost::Message; use tokio::sync::mpsc::{Sender, channel}; use tokio_stream::wrappers::ReceiverStream; -use temporalio_common::Worker as CoreWorkerTrait; -use temporalio_common::errors::{ - CompleteActivityError, CompleteNexusError, CompleteWfError, PollError, -}; use temporalio_common::protos::{ coresdk::{ ActivityHeartbeat, ActivityTaskCompletion, nexus::NexusTaskCompletion, @@ -17,6 +13,7 @@ use temporalio_common::protos::{ }, temporal::api::history::v1::History, }; +use temporalio_sdk_core::{CompleteActivityError, CompleteNexusError, CompleteWfError, PollError}; use temporalio_sdk_core::{ CoreRuntime, init_replay_worker, init_worker, replay::{HistoryForReplay, ReplayWorkerInput}, @@ -82,11 +79,12 @@ pub fn worker_new( let config = worker_options.into_core_config()?; let client_ref = client.borrow()?; - let client = client_ref.core_client.clone(); + let connection = client_ref.core_connection.clone(); let runtime = client_ref.core_runtime.clone(); enter_sync!(runtime); - let worker = init_worker(&runtime, config, client).context("Failed to initialize worker")?; + let worker = + init_worker(&runtime, config, connection).context("Failed to initialize worker")?; Ok(OpaqueOutboundHandle::new(Worker { core_runtime: runtime, @@ -120,13 +118,13 @@ pub fn worker_replace_client( ) -> BridgeResult<()> { let worker_ref = worker.borrow()?; let client_ref = client.borrow()?; - let new_client = client_ref.core_client.clone(); + let new_connection = client_ref.core_connection.clone(); let runtime = worker_ref.core_runtime.clone(); enter_sync!(runtime); worker_ref .core_worker - .replace_client(new_client) + .replace_client(new_connection) .map_err(|err| BridgeError::UnexpectedError(err.to_string()))?; Ok(()) @@ -494,14 +492,14 @@ mod config { use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior; use temporalio_common::protos::temporal::api::worker::v1::PluginInfo; use temporalio_common::worker::{ - ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, - PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerDeploymentOptions as CoreWorkerDeploymentOptions, - WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind, + WorkerDeploymentVersion as CoreWorkerDeploymentVersion, }; use temporalio_sdk_core::{ - ResourceBasedSlotsOptions, ResourceSlotOptions, - SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptions, + ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, + PollerBehavior as CorePollerBehavior, ResourceBasedSlotsOptions, ResourceSlotOptions, + SlotKind, SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptions, + WorkerConfig, WorkerVersioningStrategy, WorkflowSlotKind, }; use super::custom_slot_supplier::CustomSlotSupplierOptions; @@ -511,7 +509,6 @@ mod config { use neon::object::Object; use neon::prelude::JsResult; use neon::types::JsObject; - use temporalio_common::worker::WorkerVersioningStrategy; #[derive(TryFromJs)] pub struct BridgeWorkerOptions { @@ -806,14 +803,14 @@ mod custom_slot_supplier { use neon::{context::Context, handle::Handle, prelude::*}; - use temporalio_common::worker::{ + use temporalio_sdk_core::{ SlotInfo as CoreSlotInfo, SlotInfoTrait as _, SlotKind, SlotKindType as CoreSlotKindType, SlotMarkUsedContext as CoreSlotMarkUsedContext, SlotReleaseContext as CoreSlotReleaseContext, SlotReservationContext as CoreSlotReservationContext, SlotSupplier as CoreSlotSupplier, + SlotSupplierOptions as CoreSlotSupplierOptions, SlotSupplierPermit as CoreSlotSupplierPermit, }; - use temporalio_sdk_core::SlotSupplierOptions as CoreSlotSupplierOptions; use bridge_macros::{TryFromJs, TryIntoJs}; use tracing::warn; diff --git a/packages/worker/src/workflow-codec-runner.ts b/packages/worker/src/workflow-codec-runner.ts index 722e04442..4b133bc3d 100644 --- a/packages/worker/src/workflow-codec-runner.ts +++ b/packages/worker/src/workflow-codec-runner.ts @@ -12,6 +12,7 @@ import { encodeOptionalSingle, noopDecodeMap, noopEncodeMap, + noopEncodeSearchAttrs, } from '@temporalio/common/lib/internal-non-workflow'; import { coresdk } from '@temporalio/proto'; @@ -242,7 +243,9 @@ export class WorkflowCodecRunner { upsertWorkflowSearchAttributes: command.upsertWorkflowSearchAttributes ? { ...command.upsertWorkflowSearchAttributes, - searchAttributes: noopEncodeMap(command.upsertWorkflowSearchAttributes.searchAttributes), + searchAttributes: noopEncodeSearchAttrs( + command.upsertWorkflowSearchAttributes.searchAttributes + ), } : undefined, respondToQuery: command.respondToQuery @@ -288,7 +291,9 @@ export class WorkflowCodecRunner { // don't encode headers headers: noopEncodeMap(command.continueAsNewWorkflowExecution.headers), // don't encode searchAttributes - searchAttributes: noopEncodeMap(command.continueAsNewWorkflowExecution.searchAttributes), + searchAttributes: noopEncodeSearchAttrs( + command.continueAsNewWorkflowExecution.searchAttributes + ), } : undefined, startChildWorkflowExecution: command.startChildWorkflowExecution @@ -299,7 +304,9 @@ export class WorkflowCodecRunner { // don't encode headers headers: noopEncodeMap(command.startChildWorkflowExecution.headers), // don't encode searchAttributes - searchAttributes: noopEncodeMap(command.startChildWorkflowExecution.searchAttributes), + searchAttributes: noopEncodeSearchAttrs( + command.startChildWorkflowExecution.searchAttributes + ), } : undefined, signalExternalWorkflowExecution: command.signalExternalWorkflowExecution diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 4492d73b7..ea2039527 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -402,7 +402,7 @@ function startChildWorkflowExecutionNextHandler({ cronSchedule: options.cronSchedule, searchAttributes: options.searchAttributes || options.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated - ? encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes) // eslint-disable-line @typescript-eslint/no-deprecated + ? { indexedFields: encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes) } // eslint-disable-line @typescript-eslint/no-deprecated : undefined, memo: options.memo && mapToPayloads(activator.payloadConverter, options.memo), versioningIntent: versioningIntentToProto(options.versioningIntent), // eslint-disable-line @typescript-eslint/no-deprecated @@ -1019,7 +1019,7 @@ export function makeContinueAsNewFunc( memo: options.memo && mapToPayloads(activator.payloadConverter, options.memo), searchAttributes: options.searchAttributes || options.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated - ? encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes) // eslint-disable-line @typescript-eslint/no-deprecated + ? { indexedFields: encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes) } // eslint-disable-line @typescript-eslint/no-deprecated : undefined, workflowRunTimeout: msOptionalToTs(options.workflowRunTimeout), workflowTaskTimeout: msOptionalToTs(options.workflowTaskTimeout), @@ -1523,7 +1523,9 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes | Sear // Typed search attributes activator.pushCommand({ upsertWorkflowSearchAttributes: { - searchAttributes: encodeUnifiedSearchAttributes(undefined, searchAttributes), + searchAttributes: { + indexedFields: encodeUnifiedSearchAttributes(undefined, searchAttributes), + }, }, }); @@ -1552,7 +1554,9 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes | Sear // Legacy search attributes activator.pushCommand({ upsertWorkflowSearchAttributes: { - searchAttributes: mapToPayloads(searchAttributePayloadConverter, searchAttributes), + searchAttributes: { + indexedFields: mapToPayloads(searchAttributePayloadConverter, searchAttributes), + }, }, });