Skip to content

Commit

Permalink
cleanup types on looker for worker processing
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminpkane committed Mar 3, 2025
1 parent f82e76e commit 2f12ce0
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 94 deletions.
75 changes: 41 additions & 34 deletions app/packages/looker/src/lookers/abstract.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { Lookers } from "@fiftyone/state";
import {
AppError,
DATE_FIELD,
Expand Down Expand Up @@ -83,39 +82,36 @@ export abstract class AbstractLooker<
State extends BaseState,
S extends Sample = Sample
> {
public readonly subscriptions: {
private readonly canvas: HTMLCanvasElement;
private readonly ctx: CanvasRenderingContext2D;
private readonly rootEvents: Events<State>;
private readonly subscriptions: {
[fieldName: string]: ((newValue: any) => void)[];
};
private eventTarget: EventTarget;

private eventTarget: EventTarget;
private hideControlsTimeout: ReturnType<typeof setTimeout> | null = null;
protected lookerElement: LookerElement<State>;
private resizeObserver: ResizeObserver;
private readonly canvas: HTMLCanvasElement;
private readonly ctx: CanvasRenderingContext2D;
private previousState?: Readonly<State>;
private readonly rootEvents: Events<State>;
private isSampleUpdating: boolean = false;

protected readonly abortController: AbortController;
protected currentOverlays: Overlay<State>[];
protected sample: S;
protected readonly updater: StateUpdate<State>;
private resizeObserver: ResizeObserver;

private batchMergedUpdates: Partial<State> = {};
private isBatching = false;
private isCommittingBatchUpdates = false;
private isSampleUpdating = false;

public uuid = uuid();
protected readonly abortController: AbortController;
protected readonly updater: StateUpdate<State>;

/** @internal */
state: State;
protected asyncLabelsRenderingManager: AsyncLabelsRenderingManager<S>;
protected currentOverlays: Overlay<State>[];
protected lookerElement: LookerElement<State>;
protected sample: S;
protected state: State;

public uuid = uuid();
sampleOverlays: Overlay<State>[];
pluckedOverlays: Overlay<State>[];

public asyncLabelsRenderingManager: AsyncLabelsRenderingManager;

constructor(
sample: S,
config: State["config"],
Expand Down Expand Up @@ -172,9 +168,7 @@ export abstract class AbstractLooker<
3500
);

this.asyncLabelsRenderingManager = new AsyncLabelsRenderingManager(
this as unknown as Lookers
);
this.asyncLabelsRenderingManager = new AsyncLabelsRenderingManager();

this.init();
}
Expand Down Expand Up @@ -614,8 +608,19 @@ export abstract class AbstractLooker<

this.asyncLabelsRenderingManager
.enqueueLabelPaintingJob({
sample: this.sample,
labels: renderLabels,
options: {
coloring: this.state.options.coloring,
customizeColorSetting: this.state.options.customizeColorSetting,
colorscale: this.state.options.colorscale,
labelTagColors: this.state.options.labelTagColors,
selectedLabelTags: this.state.options.selectedLabelTags,
sources: this.state.config.sources,
schema: this.state.config.fieldSchema,
activePaths: this.state.options.activePaths,
},
schema: this.state.config.fieldSchema,
sample: this.sample,
})
.then(({ sample, coloring }) => {
this.sample = sample;
Expand Down Expand Up @@ -855,19 +860,21 @@ export abstract class AbstractLooker<

labelsWorker.addEventListener("message", listener);

const workerArgs = {
sample: sample as ProcessSample["sample"],
const workerArgs: ProcessSample & { method: "processSample" } = {
method: "processSample",
coloring: this.state.options.coloring,
customizeColorSetting: this.state.options.customizeColorSetting,
colorscale: this.state.options.colorscale,
labelTagColors: this.state.options.labelTagColors,
selectedLabelTags: this.state.options.selectedLabelTags,
sources: this.state.config.sources,
schema: this.state.config.fieldSchema,
options: {
coloring: this.state.options.coloring,
customizeColorSetting: this.state.options.customizeColorSetting,
colorscale: this.state.options.colorscale,
labelTagColors: this.state.options.labelTagColors,
selectedLabelTags: this.state.options.selectedLabelTags,
sources: this.state.config.sources,
schema: this.state.config.fieldSchema,
activePaths: this.state.options.activePaths,
},
uuid: messageUUID,
activePaths: this.state.options.activePaths,
} as ProcessSample;
sample: sample as ProcessSample["sample"],
};

try {
labelsWorker.postMessage(workerArgs, transfer);
Expand Down
80 changes: 35 additions & 45 deletions app/packages/looker/src/worker/async-labels-rendering-manager.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
import { Lookers } from "@fiftyone/state";
import {
jotaiStore,
numConcurrentRenderingLabels,
} from "@fiftyone/state/src/jotai";
import { Schema } from "@fiftyone/utilities";
import { v4 as uuid } from "uuid";
import { ProcessSample } from ".";
import { Coloring, Sample } from "..";
import type { ProcessSample, ProcessSampleOptions } from ".";
import type { Coloring, Sample } from "..";
import { LookerUtils } from "../lookers/shared";
import { retrieveTransferables } from "../lookers/utils";
import { accumulateOverlays } from "../overlays";
import { createWorker } from "../util";

export type AsyncLabelsRenderingJob = {
sample: Sample;
export type AsyncLabelsRenderingJob<S extends Sample = Sample> = {
labels: string[];
lookerRef: Lookers;
resolve: (data: Omit<WorkerResponse, "uuid">) => void;
options: ProcessSampleOptions;
resolve: (data: Omit<WorkerResponse<S>, "uuid">) => void;
reject: (error: Error) => void;
schema: Schema;
sample: S;
};

export type AsyncJobResolutionResult = {
sample: Sample;
export type AsyncJobResolutionResult<S extends Sample = Sample> = {
sample: S;
coloring: Coloring;
};

export type WorkerResponse = {
sample: Sample;
export type WorkerResponse<S extends Sample = Sample> = {
sample: S;
coloring: Coloring;
uuid: string;
};
Expand All @@ -35,7 +36,7 @@ const MAX_WORKERS =

// global job queue and indexes
const jobQueue: AsyncLabelsRenderingJob[] = [];
const pendingJobs = new Map<Sample, AsyncLabelsRenderingJob>();
const pendingJobs = new Map();
const processingSamples = new Set<Sample>();

const workerPool: Worker[] = Array.from({ length: MAX_WORKERS }, () =>
Expand Down Expand Up @@ -124,14 +125,14 @@ const assignJobToFreeWorker = (job: AsyncLabelsRenderingJob) => {
// filter sample to only include keys in job.labels
const pluckRelevant = (sample: Sample, frames = false) => {
const filtered = { ...sample };
Object.keys(filtered).forEach((key) => {
for (const key of Object.keys(filtered)) {
if (!job.labels.includes(frames ? `frames.${key}` : key)) {
if (!frames && key === "frames") {
return;
continue;
}
delete filtered[key];
}
});
}

if (filtered.frames?.length) {
filtered.frames = filtered.frames.map((frame) => {
Expand All @@ -147,19 +148,12 @@ const assignJobToFreeWorker = (job: AsyncLabelsRenderingJob) => {
method: "processSample",
sample: filteredSample as ProcessSample["sample"],
uuid: messageUuid,
coloring: job.lookerRef.state.options.coloring,
customizeColorSetting: job.lookerRef.state.options.customizeColorSetting,
colorscale: job.lookerRef.state.options.colorscale,
labelTagColors: job.lookerRef.state.options.labelTagColors,
selectedLabelTags: job.lookerRef.state.options.selectedLabelTags,
sources: job.lookerRef.state.config.sources,
schema: job.lookerRef.state.config.fieldSchema,
activePaths: job.lookerRef.state.options.activePaths,
options: job.options,
};

const { overlays: filteredOverlays } = accumulateOverlays(
filteredSample,
job.lookerRef.state.config.fieldSchema
job.schema
);
const transfer = retrieveTransferables(filteredOverlays);

Expand All @@ -168,41 +162,37 @@ const assignJobToFreeWorker = (job: AsyncLabelsRenderingJob) => {
updateRenderingCount(1);
};

export class AsyncLabelsRenderingManager {
#lookerRef: Lookers;

constructor(lookerRef: Lookers) {
this.#lookerRef = lookerRef;
}

export class AsyncLabelsRenderingManager<S extends Sample = Sample> {
/**
* Enqueue a new overlay rendering job.
* If a pending job exists for the same sample, update it.
*/
enqueueLabelPaintingJob(
item: Omit<AsyncLabelsRenderingJob, "resolve" | "reject" | "lookerRef">
): Promise<AsyncJobResolutionResult> {
item: Omit<AsyncLabelsRenderingJob<S>, "resolve" | "reject">
): Promise<AsyncJobResolutionResult<S>> {
const { sample, labels } = item;

return new Promise<AsyncJobResolutionResult>((resolve, reject) => {
return new Promise<AsyncJobResolutionResult<S>>((resolve, reject) => {
const pendingJob = pendingJobs.get(sample);
if (pendingJob) {
// merge / replace pending job for the same sample
pendingJob.labels = [...new Set([...pendingJob.labels, ...labels])];
pendingJob.resolve = resolve;
pendingJob.reject = reject;
} else {
const job: AsyncLabelsRenderingJob = {
sample,
labels,
lookerRef: this.#lookerRef,
resolve,
reject,
};
pendingJobs.set(sample, job);
jobQueue.push(job);
processQueue();
return;
}

const job: AsyncLabelsRenderingJob<S> = {
labels,
options,

Check failure on line 187 in app/packages/looker/src/worker/async-labels-rendering-manager.ts

View workflow job for this annotation

GitHub Actions / test / test-app

Unhandled error

ReferenceError: options is not defined ❯ packages/looker/src/worker/async-labels-rendering-manager.ts:187:9 ❯ AsyncLabelsRenderingManager.enqueueLabelPaintingJob packages/looker/src/worker/async-labels-rendering-manager.ts:175:12 ❯ packages/looker/src/worker/async-labels-rendering-manager.test.ts:104:13 ❯ node_modules/@vitest/runner/dist/index.js:146:14 ❯ node_modules/@vitest/runner/dist/index.js:61:7 ❯ runTest node_modules/@vitest/runner/dist/index.js:960:17 ❯ processTicksAndRejections node:internal/process/task_queues:95:5 ❯ runSuite node_modules/@vitest/runner/dist/index.js:1116:15 ❯ runSuite node_modules/@vitest/runner/dist/index.js:1116:15 This error originated in "packages/looker/src/worker/async-labels-rendering-manager.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "packages/looker/src/worker/async-labels-rendering-manager.test.ts". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.

Check failure on line 187 in app/packages/looker/src/worker/async-labels-rendering-manager.ts

View workflow job for this annotation

GitHub Actions / test / test-app

packages/looker/src/worker/async-labels-rendering-manager.test.ts > AsyncLabelsRenderingManager > should enqueue a new job and resolve with merged sample

ReferenceError: options is not defined ❯ packages/looker/src/worker/async-labels-rendering-manager.ts:187:9 ❯ AsyncLabelsRenderingManager.enqueueLabelPaintingJob packages/looker/src/worker/async-labels-rendering-manager.ts:175:12 ❯ packages/looker/src/worker/async-labels-rendering-manager.test.ts:87:29
resolve,
reject,
schema,
sample,
};
pendingJobs.set(sample, job);
jobQueue.push(job);
processQueue();
});
}

Expand Down
35 changes: 20 additions & 15 deletions app/packages/looker/src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import {
DETECTIONS,
DYNAMIC_EMBEDDED_DOCUMENT,
EMBEDDED_DOCUMENT,
getCls,
getFetchFunction,
LABEL_LIST,
Schema,
setFetchFunction,
Stage,
VALID_LABEL_TYPES,
getCls,
getFetchFunction,
setFetchFunction,
} from "@fiftyone/utilities";
import { CHUNK_SIZE } from "../constants";
import {
Expand Down Expand Up @@ -315,9 +315,7 @@ interface ReaderMethod {
method: string;
}

export interface ProcessSample {
uuid: string;
sample: Sample & FrameSample;
export interface ProcessSampleOptions {
coloring: Coloring;
customizeColorSetting: CustomizeColor[];
labelTagColors: LabelTagColor;
Expand All @@ -327,20 +325,27 @@ export interface ProcessSample {
schema: Schema;
activePaths: string[];
}
export interface ProcessSample {
sample: Sample & FrameSample;
options: ProcessSampleOptions;
uuid: string;
}

type ProcessSampleMethod = ReaderMethod & ProcessSample;

const processSample = async ({
sample,
options: {
coloring,
sources,
customizeColorSetting,
colorscale,
selectedLabelTags,
labelTagColors,
schema,
activePaths,
},
uuid,
coloring,
sources,
customizeColorSetting,
colorscale,
selectedLabelTags,
labelTagColors,
schema,
activePaths,
}: ProcessSample) => {
if (!sample) {
// edge case where looker hasn't been associated with a sample yet
Expand All @@ -357,7 +362,7 @@ const processSample = async ({
mapId(sample);

const imageBitmapPromises: Promise<ImageBitmap[]>[] = [];
let maskTargetsBuffers: ArrayBuffer[] = [];
const maskTargetsBuffers: ArrayBuffer[] = [];

if (sample?._media_type === "point-cloud" || sample?._media_type === "3d") {
// we process all 3d labels regardless of active paths
Expand Down

0 comments on commit 2f12ce0

Please sign in to comment.