Skip to content

Commit 716f3ae

Browse files
juliusmarmingecodex
authored andcommitted
Use lazy stream accessors for provider runtime events (pingdotgg#1746)
Co-authored-by: codex <codex@users.noreply.github.com>
1 parent 0089447 commit 716f3ae

14 files changed

+176
-55
lines changed

apps/server/integration/OrchestrationEngineHarness.integration.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointRea
4848
import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts";
4949
import { OrchestrationProjectionPipelineLive } from "../src/orchestration/Layers/ProjectionPipeline.ts";
5050
import { OrchestrationProjectionSnapshotQueryLive } from "../src/orchestration/Layers/ProjectionSnapshotQuery.ts";
51-
import { RuntimeReceiptBusLive } from "../src/orchestration/Layers/RuntimeReceiptBus.ts";
51+
import { RuntimeReceiptBusTest } from "../src/orchestration/Layers/RuntimeReceiptBus.ts";
5252
import { OrchestrationReactorLive } from "../src/orchestration/Layers/OrchestrationReactor.ts";
5353
import { ProviderCommandReactorLive } from "../src/orchestration/Layers/ProviderCommandReactor.ts";
5454
import { ProviderRuntimeIngestionLive } from "../src/orchestration/Layers/ProviderRuntimeIngestion.ts";
@@ -297,7 +297,7 @@ export const makeOrchestrationIntegrationHarness = (
297297
ProjectionPendingApprovalRepositoryLive,
298298
checkpointStoreLayer,
299299
providerLayer,
300-
RuntimeReceiptBusLive,
300+
RuntimeReceiptBusTest,
301301
);
302302
const serverSettingsLayer = ServerSettingsService.layerTest();
303303
const runtimeIngestionLayer = ProviderRuntimeIngestionLive.pipe(
@@ -376,7 +376,7 @@ export const makeOrchestrationIntegrationHarness = (
376376
runtime.runPromise(reactor.start().pipe(Scope.provide(scope))),
377377
).pipe(Effect.orDie);
378378
const receiptHistory = yield* Ref.make<ReadonlyArray<OrchestrationRuntimeReceipt>>([]);
379-
yield* Stream.runForEach(runtimeReceiptBus.stream, (receipt) =>
379+
yield* Stream.runForEach(runtimeReceiptBus.streamEventsForTest, (receipt) =>
380380
Ref.update(receiptHistory, (history) => [...history, receipt]).pipe(Effect.asVoid),
381381
).pipe(Effect.forkIn(scope));
382382
yield* Effect.sleep(10);

apps/server/integration/providerService.integration.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ const collectEventsDuring = <A, E, R>(
8686
Effect.forkScoped,
8787
);
8888

89+
yield* Effect.sleep("50 millis");
8990
yield* action;
9091

9192
return yield* Effect.forEach(
@@ -104,7 +105,6 @@ const runTurn = (input: {
104105
}) =>
105106
Effect.gen(function* () {
106107
yield* input.harness.queueTurnResponse(input.threadId, input.response);
107-
108108
return yield* collectEventsDuring(
109109
input.provider.streamEvents,
110110
input.response.events.length,
@@ -116,7 +116,7 @@ const runTurn = (input: {
116116
);
117117
});
118118

119-
it.effect("replays typed runtime fixture events", () =>
119+
it.live("replays typed runtime fixture events", () =>
120120
Effect.gen(function* () {
121121
const fixture = yield* makeIntegrationFixture;
122122

@@ -149,7 +149,7 @@ it.effect("replays typed runtime fixture events", () =>
149149
}).pipe(Effect.provide(NodeServices.layer)),
150150
);
151151

152-
it.effect("replays file-changing fixture turn events", () =>
152+
it.live("replays file-changing fixture turn events", () =>
153153
Effect.gen(function* () {
154154
const fixture = yield* makeIntegrationFixture;
155155
const { join } = yield* Path.Path;
@@ -188,7 +188,7 @@ it.effect("replays file-changing fixture turn events", () =>
188188
}).pipe(Effect.provide(NodeServices.layer)),
189189
);
190190

191-
it.effect("runs multi-turn tool/approval flow", () =>
191+
it.live("runs multi-turn tool/approval flow", () =>
192192
Effect.gen(function* () {
193193
const fixture = yield* makeIntegrationFixture;
194194
const { join } = yield* Path.Path;
@@ -242,7 +242,7 @@ it.effect("runs multi-turn tool/approval flow", () =>
242242
}).pipe(Effect.provide(NodeServices.layer)),
243243
);
244244

245-
it.effect("rolls back provider conversation state only", () =>
245+
it.live("rolls back provider conversation state only", () =>
246246
Effect.gen(function* () {
247247
const fixture = yield* makeIntegrationFixture;
248248
const { join } = yield* Path.Path;

apps/server/src/orchestration/Layers/CheckpointReactor.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ function createProviderServiceHarness(
9797
listSessions,
9898
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
9999
rollbackConversation,
100-
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
100+
get streamEvents() {
101+
return Stream.fromPubSub(runtimeEventPubSub);
102+
},
101103
};
102104

103105
const emit = (event: LegacyProviderRuntimeEvent): void => {

apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,9 @@ describe("ProviderCommandReactor", () => {
210210
sessionModelSwitch: input?.sessionModelSwitch ?? "in-session",
211211
}),
212212
rollbackConversation: () => unsupported(),
213-
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
213+
get streamEvents() {
214+
return Stream.fromPubSub(runtimeEventPubSub);
215+
},
214216
};
215217

216218
const orchestrationLayer = OrchestrationEngineLive.pipe(

apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ function createProviderServiceHarness() {
100100
listSessions: () => Effect.succeed([...runtimeSessions]),
101101
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
102102
rollbackConversation: () => unsupported(),
103-
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
103+
get streamEvents() {
104+
return Stream.fromPubSub(runtimeEventPubSub);
105+
},
104106
};
105107

106108
const setSession = (session: ProviderSession): void => {
Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
/**
2+
* RuntimeReceiptBus layers.
3+
*
4+
* `RuntimeReceiptBusLive` is the production default and intentionally does not
5+
* retain or broadcast receipts. `RuntimeReceiptBusTest` installs the in-memory
6+
* PubSub-backed implementation used by integration tests that need to await
7+
* checkpoint-reactor milestones precisely.
8+
*
9+
* @module RuntimeReceiptBus
10+
*/
111
import { Effect, Layer, PubSub, Stream } from "effect";
212

313
import {
@@ -6,13 +16,21 @@ import {
616
type OrchestrationRuntimeReceipt,
717
} from "../Services/RuntimeReceiptBus.ts";
818

9-
const makeRuntimeReceiptBus = Effect.gen(function* () {
19+
const makeRuntimeReceiptBus = Effect.succeed({
20+
publish: () => Effect.void,
21+
streamEventsForTest: Stream.empty,
22+
} satisfies RuntimeReceiptBusShape);
23+
24+
const makeRuntimeReceiptBusTest = Effect.gen(function* () {
1025
const pubSub = yield* PubSub.unbounded<OrchestrationRuntimeReceipt>();
1126

1227
return {
1328
publish: (receipt) => PubSub.publish(pubSub, receipt).pipe(Effect.asVoid),
14-
stream: Stream.fromPubSub(pubSub),
29+
get streamEventsForTest() {
30+
return Stream.fromPubSub(pubSub);
31+
},
1532
} satisfies RuntimeReceiptBusShape;
1633
});
1734

1835
export const RuntimeReceiptBusLive = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBus);
36+
export const RuntimeReceiptBusTest = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBusTest);

apps/server/src/orchestration/Services/RuntimeReceiptBus.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/**
2+
* RuntimeReceiptBus - Internal checkpoint-reactor synchronization receipts.
3+
*
4+
* This service exists to expose short-lived orchestration milestones that are
5+
* useful in tests and harnesses but are not part of the production runtime
6+
* event model. `CheckpointReactor` publishes receipts such as baseline capture,
7+
* diff finalization, and turn-processing quiescence so integration tests can
8+
* wait for those exact points without inferring them indirectly from persisted
9+
* state.
10+
*
11+
* Production code should only call `publish`. Test code may subscribe via
12+
* `streamEventsForTest`, which is intentionally named to make the intended
13+
* usage explicit.
14+
*
15+
* @module RuntimeReceiptBus
16+
*/
117
import { CheckpointRef, IsoDateTime, NonNegativeInt, ThreadId, TurnId } from "@t3tools/contracts";
218
import { Schema, ServiceMap } from "effect";
319
import type { Effect, Stream } from "effect";
@@ -40,7 +56,7 @@ export type OrchestrationRuntimeReceipt = typeof OrchestrationRuntimeReceipt.Typ
4056

4157
export interface RuntimeReceiptBusShape {
4258
readonly publish: (receipt: OrchestrationRuntimeReceipt) => Effect.Effect<void>;
43-
readonly stream: Stream.Stream<OrchestrationRuntimeReceipt>;
59+
readonly streamEventsForTest: Stream.Stream<OrchestrationRuntimeReceipt>;
4460
}
4561

4662
export class RuntimeReceiptBus extends ServiceMap.Service<

apps/server/src/provider/Layers/ClaudeAdapter.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3054,7 +3054,9 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* (
30543054
listSessions,
30553055
hasSession,
30563056
stopAll,
3057-
streamEvents: Stream.fromQueue(runtimeEventQueue),
3057+
get streamEvents() {
3058+
return Stream.fromQueue(runtimeEventQueue);
3059+
},
30583060
} satisfies ClaudeAdapterShape;
30593061
});
30603062

apps/server/src/provider/Layers/CodexAdapter.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1631,7 +1631,9 @@ const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
16311631
listSessions,
16321632
hasSession,
16331633
stopAll,
1634-
streamEvents: Stream.fromQueue(runtimeEventQueue),
1634+
get streamEvents() {
1635+
return Stream.fromQueue(runtimeEventQueue);
1636+
},
16351637
} satisfies CodexAdapterShape;
16361638
});
16371639

apps/server/src/provider/Layers/EventNdjsonLogger.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,49 @@ describe("EventNdjsonLogger", () => {
112112
}),
113113
);
114114

115+
it.effect("serializes concurrent first writes for the same segment", () =>
116+
Effect.gen(function* () {
117+
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-log-"));
118+
const basePath = path.join(tempDir, "provider-canonical.ndjson");
119+
120+
try {
121+
const logger = yield* makeEventNdjsonLogger(basePath, {
122+
stream: "canonical",
123+
batchWindowMs: 0,
124+
});
125+
assert.notEqual(logger, undefined);
126+
if (!logger) {
127+
return;
128+
}
129+
130+
yield* Effect.all(
131+
[
132+
logger.write({ id: "evt-concurrent-1" }, null),
133+
logger.write({ id: "evt-concurrent-2" }, null),
134+
],
135+
{ concurrency: "unbounded" },
136+
);
137+
yield* logger.close();
138+
139+
const globalPath = path.join(tempDir, "_global.log");
140+
assert.equal(fs.existsSync(globalPath), true);
141+
const lines = fs
142+
.readFileSync(globalPath, "utf8")
143+
.trim()
144+
.split("\n")
145+
.map((line) => parseLogLine(line));
146+
147+
assert.equal(lines.length, 2);
148+
assert.deepEqual(lines.map((line) => line.payload).toSorted(), [
149+
'{"id":"evt-concurrent-1"}',
150+
'{"id":"evt-concurrent-2"}',
151+
]);
152+
} finally {
153+
fs.rmSync(tempDir, { recursive: true, force: true });
154+
}
155+
}),
156+
);
157+
115158
it.effect("rotates per-thread files when max size is exceeded", () =>
116159
Effect.gen(function* () {
117160
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-log-"));

0 commit comments

Comments
 (0)