Skip to content

Commit fa18a14

Browse files
feat: allow binary gRPC headers
1 parent 8903ce8 commit fa18a14

File tree

5 files changed

+106
-21
lines changed

5 files changed

+106
-21
lines changed

packages/core-bridge/src/client.rs

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33
use std::{collections::HashMap, sync::Arc};
44

55
use neon::prelude::*;
6-
use tonic::metadata::MetadataKey;
6+
use tonic::metadata::{BinaryMetadataValue, MetadataKey};
77

88
use temporal_sdk_core::{ClientOptions as CoreClientOptions, CoreRuntime, RetryClient};
99

@@ -130,10 +130,16 @@ pub struct RpcCall {
130130
pub rpc: String,
131131
pub req: Vec<u8>,
132132
pub retry: bool,
133-
pub metadata: HashMap<String, String>,
133+
pub metadata: HashMap<String, MetadataValue>,
134134
pub timeout: Option<Duration>,
135135
}
136136

137+
#[derive(Debug, Clone, TryFromJs)]
138+
pub enum MetadataValue {
139+
Ascii { value: String },
140+
Binary { value: Vec<u8> },
141+
}
142+
137143
/// Send a request to the Workflow Service using the provided Client
138144
#[js_function]
139145
pub fn client_send_workflow_service_request(
@@ -584,16 +590,29 @@ fn rpc_req<P: prost::Message + Default>(call: RpcCall) -> BridgeResult<tonic::Re
584590

585591
let mut req = tonic::Request::new(proto);
586592
for (k, v) in call.metadata {
587-
req.metadata_mut().insert(
588-
MetadataKey::from_str(k.as_str()).map_err(|err| BridgeError::TypeError {
589-
field: None,
590-
message: format!("Invalid metadata key: {err}"),
591-
})?,
592-
v.parse().map_err(|err| BridgeError::TypeError {
593-
field: None,
594-
message: format!("Invalid metadata value: {err}"),
595-
})?,
596-
);
593+
match v {
594+
MetadataValue::Ascii { value: v } => {
595+
req.metadata_mut().insert(
596+
MetadataKey::from_str(k.as_str()).map_err(|err| BridgeError::TypeError {
597+
field: None,
598+
message: format!("Invalid metadata key: {err}"),
599+
})?,
600+
v.parse().map_err(|err| BridgeError::TypeError {
601+
field: None,
602+
message: format!("Invalid metadata value: {err}"),
603+
})?,
604+
);
605+
}
606+
MetadataValue::Binary { value: v } => {
607+
req.metadata_mut().insert_bin(
608+
MetadataKey::from_str(k.as_str()).map_err(|err| BridgeError::TypeError {
609+
field: None,
610+
message: format!("Invalid metadata key: {err}"),
611+
})?,
612+
BinaryMetadataValue::from_bytes(&v),
613+
);
614+
}
615+
}
597616
}
598617

599618
if let Some(timeout) = call.timeout {
@@ -628,7 +647,7 @@ mod config {
628647

629648
use bridge_macros::TryFromJs;
630649

631-
use crate::helpers::*;
650+
use crate::{client::MetadataValue, helpers::*};
632651

633652
#[derive(Debug, Clone, TryFromJs)]
634653
pub(super) struct ClientOptions {
@@ -637,7 +656,7 @@ mod config {
637656
client_version: String,
638657
tls: Option<TlsConfig>,
639658
http_connect_proxy: Option<HttpConnectProxy>,
640-
headers: Option<HashMap<String, String>>,
659+
headers: Option<HashMap<String, MetadataValue>>,
641660
api_key: Option<String>,
642661
disable_error_code_metric_tags: bool,
643662
}
@@ -677,13 +696,16 @@ mod config {
677696
builder.tls_cfg(tls.into());
678697
}
679698

699+
let (ascii_headers, bin_headers) = partition_headers(self.headers);
700+
680701
let client_options = builder
681702
.target_url(self.target_url)
682703
.client_name(self.client_name)
683704
.client_version(self.client_version)
684705
// tls_cfg -- above
685706
.http_connect_proxy(self.http_connect_proxy.map(Into::into))
686-
.headers(self.headers)
707+
.headers(ascii_headers)
708+
.binary_headers(bin_headers)
687709
.api_key(self.api_key)
688710
.disable_error_code_metric_tags(self.disable_error_code_metric_tags)
689711
// identity -- skipped: will be set on worker
@@ -719,4 +741,32 @@ mod config {
719741
}
720742
}
721743
}
744+
745+
fn partition_headers(
746+
headers: Option<HashMap<String, MetadataValue>>,
747+
) -> (
748+
Option<HashMap<String, String>>,
749+
Option<HashMap<String, Vec<u8>>>,
750+
) {
751+
let Some(headers) = headers else {
752+
return (None, None);
753+
};
754+
// Maybe with_capacity this assuming primarily ascii headers?
755+
let mut ascii_headers = HashMap::default();
756+
let mut bin_headers = HashMap::default();
757+
for (k, v) in headers {
758+
match v {
759+
MetadataValue::Ascii { value: v } => {
760+
ascii_headers.insert(k, v);
761+
}
762+
MetadataValue::Binary { value: v } => {
763+
bin_headers.insert(k, v);
764+
}
765+
}
766+
}
767+
(
768+
(!ascii_headers.is_empty()).then_some(ascii_headers),
769+
(!bin_headers.is_empty()).then_some(bin_headers),
770+
)
771+
}
722772
}

packages/core-bridge/ts/native.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ export interface ClientOptions {
124124
clientVersion: string;
125125
tls: Option<TLSConfig>;
126126
httpConnectProxy: Option<HttpConnectProxy>;
127-
headers: Option<Record<string, string>>;
127+
headers: Option<Record<string, MetadataValue>>;
128128
apiKey: Option<string>;
129129
disableErrorCodeMetricTags: boolean;
130130
}
@@ -157,7 +157,7 @@ export interface RpcCall {
157157
rpc: string;
158158
req: Buffer;
159159
retry: boolean;
160-
metadata: Record<string, string>;
160+
metadata: Record<string, MetadataValue>;
161161
timeout: Option<number>;
162162
}
163163

@@ -191,6 +191,16 @@ export interface Worker {
191191
type: 'worker';
192192
}
193193

194+
export type MetadataValue =
195+
| {
196+
type: 'ascii';
197+
value: string;
198+
}
199+
| {
200+
type: 'binary';
201+
value: Buffer;
202+
};
203+
194204
export interface WorkerOptions {
195205
identity: string;
196206
buildId: string;

packages/test/src/test-native-connection.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,13 +203,18 @@ test('withMetadata and withDeadline propagate metadata and deadline', async (t)
203203
);
204204
const connection = await NativeConnection.connect({
205205
address: `127.0.0.1:${port}`,
206+
metadata: { 'default-bin': Buffer.from([0x00]) },
206207
});
207208

208209
await connection.withDeadline(Date.now() + 10_000, () =>
209-
connection.withMetadata({ test: 'true' }, () => connection.workflowService.getSystemInfo({}))
210+
connection.withMetadata({ test: 'true', 'other-bin': Buffer.from([0x01]) }, () =>
211+
connection.workflowService.getSystemInfo({})
212+
)
210213
);
211214
t.is(requests.length, 2);
212215
t.is(requests[1].metadata.get('test').toString(), 'true');
216+
t.deepEqual(requests[1].metadata.get('default-bin'), [Buffer.from([0x00])]);
217+
t.deepEqual(requests[1].metadata.get('other-bin'), [Buffer.from([0x01])]);
213218
t.true(typeof requests[1].deadline === 'number' && requests[1].deadline > 5_000);
214219
await connection.close();
215220
server.forceShutdown();

packages/worker/src/connection-options.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
TLSConfig,
99
} from '@temporalio/common/lib/internal-non-workflow';
1010
import pkg from './pkg';
11+
import type { Metadata } from '@temporalio/client';
1112

1213
export { TLSConfig, ProxyConfig };
1314

@@ -44,7 +45,7 @@ export interface NativeConnectionOptions {
4445
*
4546
* Set statically at connection time, can be replaced later using {@link NativeConnection.setMetadata}.
4647
*/
47-
metadata?: Record<string, string>;
48+
metadata?: Metadata;
4849

4950
/**
5051
* API key for Temporal. This becomes the "Authorization" HTTP header with "Bearer " prepended.
@@ -102,13 +103,25 @@ export function toNativeClientOptions(options: NativeConnectionOptions): native.
102103
);
103104
}
104105

106+
let headers: Record<string, native.MetadataValue> | null = null;
107+
if (options.metadata) {
108+
headers = {};
109+
for (const [key, value] of Object.entries(options.metadata)) {
110+
if (typeof value === 'string') {
111+
headers[key] = { type: 'ascii', value };
112+
} else {
113+
headers[key] = { type: 'binary', value };
114+
}
115+
}
116+
}
117+
105118
return {
106119
targetUrl: tls ? `https://${address}` : `http://${address}`,
107120
clientName: 'temporal-typescript',
108121
clientVersion: pkg.version,
109122
tls,
110123
httpConnectProxy,
111-
headers: options.metadata ?? null,
124+
headers,
112125
apiKey: options.apiKey ?? null,
113126
disableErrorCodeMetricTags: options.disableErrorCodeMetricTags ?? false,
114127
};

packages/worker/src/connection.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,14 @@ export class NativeConnection implements ConnectionLike {
127127

128128
const ctx = this.callContextStorage.getStore() ?? {};
129129
const metadata =
130-
ctx.metadata != null ? Object.fromEntries(Object.entries(ctx.metadata).map(([k, v]) => [k, v.toString()])) : {};
130+
ctx.metadata != null
131+
? Object.fromEntries(
132+
Object.entries(ctx.metadata).map(([k, value]) => [
133+
k,
134+
typeof value === 'string' ? { type: 'ascii' as const, value } : { type: 'binary' as const, value },
135+
])
136+
)
137+
: {};
131138

132139
const req = {
133140
rpc: method.name,

0 commit comments

Comments
 (0)