Skip to content

Markduckworth/pipeline options #8919

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: feat/pipelines
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 70 additions & 26 deletions packages/firestore/src/api/pipeline_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,22 @@

import { Pipeline } from '../api/pipeline';
import { firestoreClientExecutePipeline } from '../core/firestore_client';
import {
StructuredPipeline,
StructuredPipelineOptions
} from '../core/structured_pipeline';
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
import { PipelineResult, PipelineSnapshot } from '../lite-api/pipeline-result';
import { PipelineSource } from '../lite-api/pipeline-source';
import { PipelineOptions } from '../lite-api/pipeline_settings';
import { Stage } from '../lite-api/stage';
import { newUserDataReader } from '../lite-api/user_data_reader';
import {
newUserDataReader,
parseData,
UserDataReader,
UserDataSource
} from '../lite-api/user_data_reader';
import { ApiClientObjectMap, Value } from '../protos/firestore_proto_api';
import { cast } from '../util/input_validation';

import { ensureFirestoreConfigured, Firestore } from './database';
Expand Down Expand Up @@ -68,35 +79,68 @@ declare module './database' {
* @param pipeline The pipeline to execute.
* @return A Promise representing the asynchronous pipeline execution.
*/
export function execute(pipeline: LitePipeline): Promise<PipelineSnapshot> {
export function execute(pipeline: LitePipeline): Promise<PipelineSnapshot>;
export function execute(options: PipelineOptions): Promise<PipelineSnapshot>;
export function execute(
pipelineOrOptions: LitePipeline | PipelineOptions
): Promise<PipelineSnapshot> {
const pipeline: LitePipeline =
pipelineOrOptions instanceof LitePipeline
? pipelineOrOptions
: pipelineOrOptions.pipeline;
const options: StructuredPipelineOptions = !(
pipelineOrOptions instanceof LitePipeline
)
? pipelineOrOptions
: {};
const genericOptions: { [name: string]: unknown } =
(pipelineOrOptions as PipelineOptions).genericOptions ?? {};

const firestore = cast(pipeline._db, Firestore);
const client = ensureFirestoreConfigured(firestore);
return firestoreClientExecutePipeline(client, pipeline).then(result => {
// Get the execution time from the first result.
// firestoreClientExecutePipeline returns at least one PipelineStreamElement
// even if the returned document set is empty.
const executionTime =
result.length > 0 ? result[0].executionTime?.toTimestamp() : undefined;

const docs = result
// Currently ignore any response from ExecutePipeline that does
// not contain any document data in the `fields` property.
.filter(element => !!element.fields)
.map(
element =>
new PipelineResult(
pipeline._userDataWriter,
element.key?.path
? new DocumentReference(firestore, null, element.key)
: undefined,
element.fields,
element.createTime?.toTimestamp(),
element.updateTime?.toTimestamp()
)
);
const udr = new UserDataReader(
firestore._databaseId,
/* ignoreUndefinedProperties */ true
);
const context = udr.createContext(UserDataSource.Argument, 'execute');
const optionsOverride: ApiClientObjectMap<Value> =
parseData(genericOptions, context)?.mapValue?.fields ?? {};

return new PipelineSnapshot(pipeline, docs, executionTime);
});
const structuredPipeline: StructuredPipeline = new StructuredPipeline(
pipeline,
options,
optionsOverride
);

return firestoreClientExecutePipeline(client, structuredPipeline).then(
result => {
// Get the execution time from the first result.
// firestoreClientExecutePipeline returns at least one PipelineStreamElement
// even if the returned document set is empty.
const executionTime =
result.length > 0 ? result[0].executionTime?.toTimestamp() : undefined;

const docs = result
// Currently ignore any response from ExecutePipeline that does
// not contain any document data in the `fields` property.
.filter(element => !!element.fields)
.map(
element =>
new PipelineResult(
pipeline._userDataWriter,
element.key?.path
? new DocumentReference(firestore, null, element.key)
: undefined,
element.fields,
element.createTime?.toTimestamp(),
element.updateTime?.toTimestamp()
)
);

return new PipelineSnapshot(pipeline, docs, executionTime);
}
);
}

// Augment the Firestore class with the pipeline() factory method
Expand Down
4 changes: 2 additions & 2 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
CredentialsProvider
} from '../api/credentials';
import { User } from '../auth/user';
import { Pipeline } from '../lite-api/pipeline';
import { LocalStore } from '../local/local_store';
import {
localStoreConfigureFieldIndexes,
Expand Down Expand Up @@ -87,6 +86,7 @@ import {
removeSnapshotsInSyncListener
} from './event_manager';
import { newQueryForPath, Query } from './query';
import { StructuredPipeline } from './structured_pipeline';
import { SyncEngine } from './sync_engine';
import {
syncEngineListen,
Expand Down Expand Up @@ -557,7 +557,7 @@ export function firestoreClientRunAggregateQuery(

export function firestoreClientExecutePipeline(
client: FirestoreClient,
pipeline: Pipeline
pipeline: StructuredPipeline
): Promise<PipelineStreamElement[]> {
const deferred = new Deferred<PipelineStreamElement[]>();

Expand Down
80 changes: 80 additions & 0 deletions packages/firestore/src/core/structured_pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* @license
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ObjectValue } from '../model/object_value';
import { FieldPath } from '../model/path';
import {
StructuredPipeline as StructuredPipelineProto,
Pipeline as PipelineProto,
ApiClientObjectMap,
Value
} from '../protos/firestore_proto_api';
import { JsonProtoSerializer, ProtoSerializable } from '../remote/serializer';
import { mapToArray } from '../util/obj';

export interface StructuredPipelineOptions {
indexMode?: 'recommended';
}

export class StructuredPipeline
implements ProtoSerializable<StructuredPipelineProto>
{
constructor(
private pipeline: ProtoSerializable<PipelineProto>,
private options: StructuredPipelineOptions,
private optionsOverride: ApiClientObjectMap<Value>
) {}

/**
* @private
* @internal for testing
*/
_getKnownOptions(): ObjectValue {
const options: ObjectValue = ObjectValue.empty();

// SERIALIZE KNOWN OPTIONS
if (typeof this.options.indexMode === 'string') {
options.set(FieldPath.fromServerFormat('index_mode'), {
stringValue: this.options.indexMode
});
}

return options;
}

private getOptionsProto(): ApiClientObjectMap<Value> {
const options: ObjectValue = this._getKnownOptions();

// APPLY OPTIONS OVERRIDES
const optionsMap = new Map(
mapToArray(this.optionsOverride, (value, key) => [
FieldPath.fromServerFormat(key),
value
])
);
options.setAll(optionsMap);

return options.value.mapValue.fields ?? {};
}

_toProto(serializer: JsonProtoSerializer): StructuredPipelineProto {
return {
pipeline: this.pipeline._toProto(serializer),
options: this.getOptionsProto()
};
}
}
61 changes: 61 additions & 0 deletions packages/firestore/src/lite-api/pipeline_settings.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import type { Pipeline } from './pipeline';

/**
* Options defining how a Pipeline is evaluated.
*/
export interface PipelineOptions {
/**
* Pipeline to be evaluated.
*/
pipeline: Pipeline;

/**
* Specify the index mode.
*/
indexMode?: 'recommended';

/**
* An escape hatch to set options not known at SDK build time. These values
* will be passed directly to the Firestore backend and not used by the SDK.
*
* The generic option name will be used as provided. And must match the name
* format used by the backend (hint: use a snake_case_name).
*
* Generic option values can be any type supported
* by Firestore (for example: string, boolean, number, map, …). Value types
* not known to the SDK will be rejected.
*
* Values specified in genericOptions will take precedence over any options
* with the same name set by the SDK.
*
* Override the `example_option`:
* ```
* execute({
* pipeline: myPipeline,
* genericOptions: {
* // Override `example_option`. This will not
* // merge with the existing `example_option` object.
* "example_option": {
* foo: "bar"
* }
* }
* }
* ```
*
* `genericOptions` supports dot notation, if you want to override
* a nested option.
* ```
* execute({
* pipeline: myPipeline,
* genericOptions: {
* // Override `example_option.foo` and do not override
* // any other properties of `example_option`.
* "example_option.foo": "bar"
* }
* }
* ```
*/
genericOptions?: {
[name: string]: unknown;
};
}
8 changes: 3 additions & 5 deletions packages/firestore/src/remote/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { User } from '../auth/user';
import { Aggregate } from '../core/aggregate';
import { DatabaseId } from '../core/database_info';
import { queryToAggregateTarget, Query, queryToTarget } from '../core/query';
import { Pipeline } from '../lite-api/pipeline';
import { StructuredPipeline } from '../core/structured_pipeline';
import { Document } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { Mutation } from '../model/mutation';
Expand Down Expand Up @@ -242,14 +242,12 @@ export async function invokeBatchGetDocumentsRpc(

export async function invokeExecutePipeline(
datastore: Datastore,
pipeline: Pipeline
structuredPipeline: StructuredPipeline
): Promise<PipelineStreamElement[]> {
const datastoreImpl = debugCast(datastore, DatastoreImpl);
const executePipelineRequest: ProtoExecutePipelineRequest = {
database: getEncodedDatabaseId(datastoreImpl.serializer),
structuredPipeline: {
pipeline: pipeline._toProto(datastoreImpl.serializer)
}
structuredPipeline: structuredPipeline._toProto(datastoreImpl.serializer)
};

const response = await datastoreImpl.invokeStreamingRPC<
Expand Down
Loading
Loading