From f8d2f30029ff7ad959d518c26f73fbc0a8bacaeb Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Tue, 25 Nov 2025 11:50:48 -0500 Subject: [PATCH 1/3] react to comments --- .../azcosmos/cosmos_container_query_engine.go | 77 +++++++++++-------- 1 file changed, 47 insertions(+), 30 deletions(-) diff --git a/sdk/data/azcosmos/cosmos_container_query_engine.go b/sdk/data/azcosmos/cosmos_container_query_engine.go index 406f38dfa48a..8465eda21abc 100644 --- a/sdk/data/azcosmos/cosmos_container_query_engine.go +++ b/sdk/data/azcosmos/cosmos_container_query_engine.go @@ -182,7 +182,7 @@ func (c *ContainerClient) executeQueryWithEngine(queryEngine queryengine.QueryEn // runEngineRequests concurrently executes per-partition QueryRequests for either query or readMany pipelines. // prepareFn returns the query text, parameters, and a drain flag for each request. -// It serializes ProvideData calls through a single goroutine to preserve ordering guarantees required by the pipeline. +// Collects all results and calls ProvideData once with a single batch to reduce CGo overhead. func runEngineRequests( ctx context.Context, c *ContainerClient, @@ -192,32 +192,16 @@ func runEngineRequests( requests []queryengine.QueryRequest, concurrency int, prepareFn func(req queryengine.QueryRequest) (query string, params []QueryParameter, drain bool), -) (totalCharge float32, err error) { +) (float32, error) { if len(requests) == 0 { return 0, nil } jobs := make(chan queryengine.QueryRequest, len(requests)) - provideCh := make(chan []queryengine.QueryResult) + resultsCh := make(chan queryengine.QueryResult) errCh := make(chan error, 1) done := make(chan struct{}) - providerDone := make(chan struct{}) var wg sync.WaitGroup - var chargeMu sync.Mutex - - // Provider goroutine ensures only one ProvideData executes at a time. - go func() { - defer close(providerDone) - for batch := range provideCh { - if perr := pipeline.ProvideData(batch); perr != nil { - select { - case errCh <- perr: - default: - } - return - } - } - }() // Adjust concurrency. workerCount := concurrency @@ -228,9 +212,25 @@ func runEngineRequests( workerCount = 1 } + // Per-worker request charge slots (no lock needed) + charges := make([]float32, workerCount) + + // Collector goroutine gathers all results + var allResults []queryengine.QueryResult + var resultsMu sync.Mutex + collectorDone := make(chan struct{}) + go func() { + defer close(collectorDone) + for result := range resultsCh { + resultsMu.Lock() + allResults = append(allResults, result) + resultsMu.Unlock() + } + }() + for w := 0; w < workerCount; w++ { wg.Add(1) - go func() { + go func(workerIndex int) { defer wg.Done() for { select { @@ -274,9 +274,7 @@ func runEngineRequests( } return } - chargeMu.Lock() - totalCharge += qResp.RequestCharge - chargeMu.Unlock() + charges[workerIndex] += qResp.RequestCharge // Load the data into a buffer to send it to the pipeline buf := new(bytes.Buffer) @@ -302,12 +300,12 @@ func runEngineRequests( select { case <-done: return - case provideCh <- []queryengine.QueryResult{result}: + case resultsCh <- result: } } } } - }() + }(w) } // Feed jobs @@ -323,8 +321,17 @@ func runEngineRequests( close(jobs) }() - // Close provider after workers finish - go func() { wg.Wait(); close(provideCh) }() + // Close results channel after workers finish + go func() { wg.Wait(); close(resultsCh) }() + + // Helper to sum charges + sumCharges := func() float32 { + var total float32 + for _, charge := range charges { + total += charge + } + return total + } // Wait for completion / error / cancellation select { @@ -334,15 +341,25 @@ func runEngineRequests( default: close(done) } - return totalCharge, e + return sumCharges(), e case <-ctx.Done(): select { case <-done: default: close(done) } - return totalCharge, ctx.Err() - case <-providerDone: + return sumCharges(), ctx.Err() + case <-collectorDone: + } + + // Sum up all worker charges + totalCharge := sumCharges() + + // Provide all collected results in a single batch + if len(allResults) > 0 { + if err := pipeline.ProvideData(allResults); err != nil { + return totalCharge, err + } } return totalCharge, nil From e81d0e808a407772243bdce57eb62e35fe85ba50 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Tue, 25 Nov 2025 12:29:57 -0500 Subject: [PATCH 2/3] update changelog --- sdk/data/azcosmos/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/data/azcosmos/CHANGELOG.md b/sdk/data/azcosmos/CHANGELOG.md index 4ccf5eabf2fa..d86fd65eff13 100644 --- a/sdk/data/azcosmos/CHANGELOG.md +++ b/sdk/data/azcosmos/CHANGELOG.md @@ -9,6 +9,7 @@ ### Bugs Fixed ### Other Changes +* Small performance optimizations to API's using query engine. See [PR 25669](https://github.com/Azure/azure-sdk-for-go/pull/25669) ## 1.5.0-beta.4 (2025-11-24) From 42c7d84e5930aa5ddc3441aec970dd64c949d533 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Tue, 25 Nov 2025 12:43:36 -0500 Subject: [PATCH 3/3] react to copilot comments --- .../azcosmos/cosmos_container_query_engine.go | 63 ++++++++----------- 1 file changed, 25 insertions(+), 38 deletions(-) diff --git a/sdk/data/azcosmos/cosmos_container_query_engine.go b/sdk/data/azcosmos/cosmos_container_query_engine.go index 8465eda21abc..9af2f72ab774 100644 --- a/sdk/data/azcosmos/cosmos_container_query_engine.go +++ b/sdk/data/azcosmos/cosmos_container_query_engine.go @@ -198,7 +198,6 @@ func runEngineRequests( } jobs := make(chan queryengine.QueryRequest, len(requests)) - resultsCh := make(chan queryengine.QueryResult) errCh := make(chan error, 1) done := make(chan struct{}) var wg sync.WaitGroup @@ -212,26 +211,15 @@ func runEngineRequests( workerCount = 1 } - // Per-worker request charge slots (no lock needed) + // Per-worker request charge slots and result slices (lock-free updates) charges := make([]float32, workerCount) - - // Collector goroutine gathers all results - var allResults []queryengine.QueryResult - var resultsMu sync.Mutex - collectorDone := make(chan struct{}) - go func() { - defer close(collectorDone) - for result := range resultsCh { - resultsMu.Lock() - allResults = append(allResults, result) - resultsMu.Unlock() - } - }() + resultsSlices := make([][]queryengine.QueryResult, workerCount) for w := 0; w < workerCount; w++ { wg.Add(1) go func(workerIndex int) { defer wg.Done() + localResults := make([]queryengine.QueryResult, 0, 8) for { select { case <-done: @@ -240,12 +228,12 @@ func runEngineRequests( return case req, ok := <-jobs: if !ok { + // jobs exhausted + resultsSlices[workerIndex] = localResults return } - log.Writef(azlog.EventRequest, "Engine pipeline requested data for PKRange: %s", req.PartitionKeyRangeID) queryText, params, drain := prepareFn(req) - // Pagination loop fetchMorePages := true for fetchMorePages { qr := queryRequest(req) @@ -265,7 +253,6 @@ func runEngineRequests( } return } - qResp, err := newQueryResponse(azResponse) if err != nil { select { @@ -275,8 +262,6 @@ func runEngineRequests( return } charges[workerIndex] += qResp.RequestCharge - - // Load the data into a buffer to send it to the pipeline buf := new(bytes.Buffer) if _, err := buf.ReadFrom(azResponse.Body); err != nil { select { @@ -288,20 +273,13 @@ func runEngineRequests( continuation := azResponse.Header.Get(cosmosHeaderContinuationToken) data := buf.Bytes() fetchMorePages = continuation != "" && drain - - // Provide the data to the pipeline, make sure it's tagged with the partition key range ID so the pipeline can merge it into the correct partition. - result := queryengine.QueryResult{ + localResults = append(localResults, queryengine.QueryResult{ PartitionKeyRangeID: req.PartitionKeyRangeID, NextContinuation: continuation, Data: data, RequestId: req.Id, - } + }) log.Writef(EventQueryEngine, "Received response for PKRange: %s. Continuation present: %v", req.PartitionKeyRangeID, continuation != "") - select { - case <-done: - return - case resultsCh <- result: - } } } } @@ -321,14 +299,15 @@ func runEngineRequests( close(jobs) }() - // Close results channel after workers finish - go func() { wg.Wait(); close(resultsCh) }() + // Wait for workers to finish (or error/cancel) + workersDone := make(chan struct{}) + go func() { wg.Wait(); close(workersDone) }() // Helper to sum charges sumCharges := func() float32 { var total float32 - for _, charge := range charges { - total += charge + for _, cval := range charges { + total += cval } return total } @@ -349,15 +328,23 @@ func runEngineRequests( close(done) } return sumCharges(), ctx.Err() - case <-collectorDone: + case <-workersDone: } - // Sum up all worker charges totalCharge := sumCharges() - // Provide all collected results in a single batch - if len(allResults) > 0 { - if err := pipeline.ProvideData(allResults); err != nil { + // Merge per-worker result slices deterministically + // Pre-size combined slice for efficiency + var combinedCount int + for _, rs := range resultsSlices { + combinedCount += len(rs) + } + if combinedCount > 0 { + all := make([]queryengine.QueryResult, 0, combinedCount) + for _, rs := range resultsSlices { + all = append(all, rs...) + } + if err := pipeline.ProvideData(all); err != nil { return totalCharge, err } }