Skip to content

Commit b9ec854

Browse files
feat: update setMetadata to accept binary headers
1 parent 2235a4f commit b9ec854

File tree

4 files changed

+62
-15
lines changed

4 files changed

+62
-15
lines changed

packages/core-bridge/src/client.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,23 @@ pub fn client_new(
8888
#[js_function]
8989
pub fn client_update_headers(
9090
client: OpaqueInboundHandle<Client>,
91-
headers: HashMap<String, String>,
91+
headers: HashMap<String, MetadataValue>,
9292
) -> BridgeResult<()> {
93+
let (ascii_headers, bin_headers) = config::partition_headers(Some(headers));
9394
client
9495
.borrow()?
9596
.core_client
9697
.get_client()
97-
.set_headers(headers)
98+
.set_headers(ascii_headers.unwrap_or_default())
99+
.map_err(|err| BridgeError::TypeError {
100+
message: format!("Invalid metadata key: {err}"),
101+
field: None,
102+
})?;
103+
client
104+
.borrow()?
105+
.core_client
106+
.get_client()
107+
.set_binary_headers(bin_headers.unwrap_or_default())
98108
.map_err(|err| BridgeError::TypeError {
99109
message: format!("Invalid metadata key: {err}"),
100110
field: None,
@@ -742,7 +752,7 @@ mod config {
742752
}
743753
}
744754

745-
fn partition_headers(
755+
pub(super) fn partition_headers(
746756
headers: Option<HashMap<String, MetadataValue>>,
747757
) -> (
748758
Option<HashMap<String, String>>,

packages/core-bridge/ts/native.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ export interface OtelMetricsExporterOptions {
100100

101101
export declare function newClient(runtime: Runtime, clientOptions: ClientOptions): Promise<Client>;
102102

103-
export declare function clientUpdateHeaders(client: Client, headers: Record<string, string>): void;
103+
export declare function clientUpdateHeaders(client: Client, headers: Record<string, MetadataValue>): void;
104104

105105
export declare function clientUpdateApiKey(client: Client, apiKey: string): void;
106106

packages/test/src/test-native-connection.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,3 +439,39 @@ test('can power workflow client calls', async (t) => {
439439
await env.teardown();
440440
}
441441
});
442+
443+
test('setMetadata accepts binary headers', async (t) => {
444+
const requests = new Array<{ metadata: grpc.Metadata; deadline: grpc.Deadline }>();
445+
const server = new grpc.Server();
446+
server.addService(workflowServiceProtoDescriptor.temporal.api.workflowservice.v1.WorkflowService.service, {
447+
getSystemInfo(
448+
call: grpc.ServerUnaryCall<
449+
temporal.api.workflowservice.v1.IGetSystemInfoRequest,
450+
temporal.api.workflowservice.v1.IGetSystemInfoResponse
451+
>,
452+
callback: grpc.sendUnaryData<temporal.api.workflowservice.v1.IGetSystemInfoResponse>
453+
) {
454+
requests.push({ metadata: call.metadata, deadline: call.getDeadline() });
455+
callback(null, {});
456+
},
457+
});
458+
459+
const port = await util.promisify(server.bindAsync.bind(server))(
460+
'localhost:0',
461+
grpc.ServerCredentials.createInsecure()
462+
);
463+
const connection = await NativeConnection.connect({
464+
address: `127.0.0.1:${port}`,
465+
metadata: { 'start-ascii': 'a', 'start-bin': Buffer.from([0x00]) },
466+
});
467+
468+
await connection.setMetadata({ 'end-bin': Buffer.from([0x01]) });
469+
470+
await connection.workflowService.getSystemInfo({});
471+
t.is(requests.length, 2);
472+
t.deepEqual(requests[1].metadata.get('start-bin'), []);
473+
t.deepEqual(requests[1].metadata.get('start-ascii'), []);
474+
t.deepEqual(requests[1].metadata.get('end-bin'), [Buffer.from([0x01])]);
475+
await connection.close();
476+
server.forceShutdown();
477+
});

packages/worker/src/connection.ts

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,15 +126,7 @@ export class NativeConnection implements ConnectionLike {
126126
// TODO: add support for abortSignal
127127

128128
const ctx = this.callContextStorage.getStore() ?? {};
129-
const metadata =
130-
ctx.metadata != null
131-
? Object.fromEntries(
132-
Object.entries(ctx.metadata).map(([k, value]) => [
133-
k,
134-
typeof value === 'string' ? { type: 'ascii' as const, value } : { type: 'binary' as const, value },
135-
])
136-
)
137-
: {};
129+
const metadata = ctx.metadata != null ? tagMetadata(ctx.metadata) : {};
138130

139131
const req = {
140132
rpc: method.name,
@@ -271,8 +263,8 @@ export class NativeConnection implements ConnectionLike {
271263
*
272264
* Use {@link NativeConnectionOptions.metadata} to set the initial metadata for client creation.
273265
*/
274-
async setMetadata(metadata: Record<string, string>): Promise<void> {
275-
native.clientUpdateHeaders(this.nativeClient, metadata);
266+
async setMetadata(metadata: Metadata): Promise<void> {
267+
native.clientUpdateHeaders(this.nativeClient, tagMetadata(metadata));
276268
}
277269

278270
/**
@@ -348,3 +340,12 @@ function getRelativeTimeout(deadline: grpc.Deadline) {
348340
return timeout;
349341
}
350342
}
343+
344+
function tagMetadata(metadata: Metadata): Record<string, native.MetadataValue> {
345+
return Object.fromEntries(
346+
Object.entries(metadata).map(([k, value]) => [
347+
k,
348+
typeof value === 'string' ? { type: 'ascii' as const, value } : { type: 'binary' as const, value },
349+
])
350+
);
351+
}

0 commit comments

Comments
 (0)