Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public virtual async Task<ResponseMessage> SendAsync(
else
{
Range<string> singleRange = overlappingRanges[0].ToRange();
Console.WriteLine($"RequestInvokerHandler.SendAsync - Using FeedRange: {singleRange}");
if ((singleRange.Min == feedRangeEpk.Range.Min) && (singleRange.Max == feedRangeEpk.Range.Max))
{
// 2) The EpkRange spans exactly one physical partition
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class OrderByQueryPartitionRangePageAsyncEnumerator : PartitionRangePageAsyncEnumerator<OrderByQueryPage, QueryState>, IPrefetcher
Expand All @@ -25,15 +26,17 @@ public static OrderByQueryPartitionRangePageAsyncEnumerator Create(
PartitionKey? partitionKey,
QueryExecutionOptions queryPaginationOptions,
string filter,
PrefetchPolicy prefetchPolicy)
PrefetchPolicy prefetchPolicy,
ContainerQueryProperties containerQueryProperties)
{
InnerEnumerator enumerator = new InnerEnumerator(
queryDataSource,
sqlQuerySpec,
feedRangeState,
partitionKey,
queryPaginationOptions,
filter);
filter,
containerQueryProperties);

BufferedPartitionRangePageAsyncEnumeratorBase<OrderByQueryPage, QueryState> bufferedEnumerator = prefetchPolicy switch
{
Expand Down Expand Up @@ -105,22 +108,25 @@ public OrderByQueryPartitionRangePageAsyncEnumerator CloneAsFullyBufferedEnumera

private sealed class InnerEnumerator : PartitionRangePageAsyncEnumerator<OrderByQueryPage, QueryState>
{
private readonly IQueryDataSource queryDataSource;
private readonly IQueryDataSource queryDataSource;
private readonly ContainerQueryProperties containerQueryProperties;

public InnerEnumerator(
IQueryDataSource queryDataSource,
SqlQuerySpec sqlQuerySpec,
FeedRangeState<QueryState> feedRangeState,
PartitionKey? partitionKey,
QueryExecutionOptions queryPaginationOptions,
string filter)
string filter,
ContainerQueryProperties containerQueryProperties)
: base(feedRangeState)
{
this.queryDataSource = queryDataSource ?? throw new ArgumentNullException(nameof(queryDataSource));
this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec));
this.PartitionKey = partitionKey;
this.QueryPaginationOptions = queryPaginationOptions ?? QueryExecutionOptions.Default;
this.Filter = filter;
this.containerQueryProperties = containerQueryProperties;
}

public SqlQuerySpec SqlQuerySpec { get; }
Expand All @@ -144,7 +150,8 @@ public InnerEnumerator CloneWithMaxPageSize()
this.FeedRangeState,
this.PartitionKey,
options,
this.Filter);
this.Filter,
this.containerQueryProperties);
}

public override ValueTask DisposeAsync() => default;
Expand All @@ -153,7 +160,7 @@ protected override async Task<TryCatch<OrderByQueryPage>> GetNextPageAsync(ITrac
{
// Unfortunately we need to keep both the epk range and partition key for queries
// Since the continuation token format uses epk range even though we only need the partition key to route the request.
FeedRangeInternal feedRange = this.PartitionKey.HasValue ? new FeedRangePartitionKey(this.PartitionKey.Value) : this.FeedRangeState.FeedRange;
FeedRangeInternal feedRange = CrossPartition.Parallel.QueryPartitionRangePageAsyncEnumerator.LimitFeedRangeToSinglePartition(this.PartitionKey, this.FeedRangeState.FeedRange, this.containerQueryProperties);

TryCatch<QueryPage> monadicQueryPage = await this.queryDataSource
.MonadicQueryAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected override Task<TryCatch<QueryPage>> GetNextPageAsync(ITrace trace, Canc
throw new ArgumentNullException(nameof(trace));
}

FeedRangeInternal feedRange = this.LimitFeedRangeToSinglePartition();
FeedRangeInternal feedRange = LimitFeedRangeToSinglePartition(this.partitionKey, this.FeedRangeState.FeedRange, this.containerQueryProperties);
return this.queryDataSource.MonadicQueryAsync(
sqlQuerySpec: this.sqlQuerySpec,
feedRangeState: new FeedRangeState<QueryState>(feedRange, this.FeedRangeState.State),
Expand All @@ -63,29 +63,28 @@ protected override Task<TryCatch<QueryPage>> GetNextPageAsync(ITrace trace, Canc
/// Since such an epk range does not exist at the container level, Service generates a GoneException.
/// This method restrics the range of each container by shrinking the ends of the range so that they do not span across physical partition.
/// </summary>
private FeedRangeInternal LimitFeedRangeToSinglePartition()
internal static FeedRangeInternal LimitFeedRangeToSinglePartition(PartitionKey? partitionKey, FeedRangeInternal feedRange, ContainerQueryProperties containerQueryProperties)
{
// We sadly need to check the partition key, since a user can set a partition key in the request options with a different continuation token.
// In the future the partition filtering and continuation information needs to be a tightly bounded contract (like cross feed range state).
FeedRangeInternal feedRange = this.FeedRangeState.FeedRange;
if (this.partitionKey.HasValue)
if (partitionKey.HasValue)
{
// ISSUE-HACK-adityasa-3/25/2024 - We should not update the original feed range inside this class.
// Instead we should guarantee that when enumerator is instantiated it is limited to a single physical partition.
// Ultimately we should remove enumerator's dependency on PartitionKey.
if ((this.containerQueryProperties.PartitionKeyDefinition.Paths.Count > 1) &&
(this.partitionKey.Value.InternalKey.Components.Count != this.containerQueryProperties.PartitionKeyDefinition.Paths.Count) &&
if ((containerQueryProperties.PartitionKeyDefinition.Paths.Count > 1) &&
(partitionKey.Value.InternalKey.Components.Count != containerQueryProperties.PartitionKeyDefinition.Paths.Count) &&
(feedRange is FeedRangeEpk feedRangeEpk))
{
if (this.containerQueryProperties.EffectiveRangesForPartitionKey == null ||
this.containerQueryProperties.EffectiveRangesForPartitionKey.Count == 0)
if (containerQueryProperties.EffectiveRangesForPartitionKey == null ||
containerQueryProperties.EffectiveRangesForPartitionKey.Count == 0)
{
throw new InvalidOperationException(
"EffectiveRangesForPartitionKey should be populated when PK is specified in request options.");
}

foreach (Documents.Routing.Range<String> epkForPartitionKey in
this.containerQueryProperties.EffectiveRangesForPartitionKey)
containerQueryProperties.EffectiveRangesForPartitionKey)
{
if (Documents.Routing.Range<String>.CheckOverlapping(
feedRangeEpk.Range,
Expand Down Expand Up @@ -138,7 +137,7 @@ private FeedRangeInternal LimitFeedRangeToSinglePartition()
}
else
{
feedRange = new FeedRangePartitionKey(this.partitionKey.Value);
feedRange = new FeedRangePartitionKey(partitionKey.Value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
queryPaginationOptions: queryPaginationOptions,
maxConcurrency: maxConcurrency,
nonStreamingOrderBy: queryInfo.HasNonStreamingOrderBy,
continuationToken: continuationToken);
continuationToken: continuationToken,
containerQueryProperties: containerQueryProperties);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
namespace Microsoft.Azure.Cosmos.EmulatorTests.Query
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
// using Microsoft.Azure.Cosmos.SDK.EmulatorTests;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json.Linq;

[TestClass]
public class TestUserRepro
{
[TestMethod]
public async Task InfiniteLoop()
{
await MainAsync(@"AccountEndpoint=https://pflb-dev-cosmosdb.documents.azure.com:443/;");
}

static async Task MainAsync(params string[] args)
{
if (args.Length == 0 || String.IsNullOrWhiteSpace(args[0]))
{
throw new ArgumentException("First command line argument needs to be the read-only connection string.", "args[0]");
}

using CosmosClient client = new CosmosClient(args[0]);
Container container = client.GetContainer("Leaderboard", "Data");
await Correct_ButUnintuitive_QueryResultsAsync(container);

// BUG BUG BUG
// Using QueryRequestOptions.PartitionKey just hangs when a HPK spans more than one Physical partition
await Hang_FilteringByRequestOptionsWithPKAsync(container);

// BUG BUG BUG
// Using FeedRange of partial HPK to filter query results does not work
// await FunctionalBug_NotFilteringByEpkAsync(container);
}

static async Task Correct_ButUnintuitive_QueryResultsAsync(Container container)
{
Console.WriteLine();
Console.WriteLine("Correct_ButUnintuitive_QueryResultsAsync");
Console.WriteLine("-------------------------------------------");

var shardId = "0";
var leaderboardId = "095bceb0-af08-432b-adaf-3f81bfaf01ec:0:::";
//var leaderboardId = "0d09115f-12bd-4635-9889-b19bef9d7dfd:0:::";

var queryDefinition = new QueryDefinition(
$"SELECT * FROM c "
+ $"WHERE c.ShardId = @shardId "
+ $"AND c.LeaderboardId = @leaderboardId")
.WithParameter("@shardId", shardId)
.WithParameter("@leaderboardId", leaderboardId);

var partitionKey = new PartitionKeyBuilder().Add(shardId).Add(leaderboardId).Build();

int resultCount = await ExecuteQueryWithHPKAsync(container, queryDefinition, pk: null, feedRange: null);
Debug.Assert(resultCount == 10000);
}

static async Task Hang_FilteringByRequestOptionsWithPKAsync(Container container)
{
Console.WriteLine();
Console.WriteLine("Hang_FilteringByRequestOptionsWithPKAsync");
Console.WriteLine("-------------------------------------------");


var shardId = "0";
var leaderboardId = "095bceb0-af08-432b-adaf-3f81bfaf01ec:0:::";
//var leaderboardId = "0d09115f-12bd-4635-9889-b19bef9d7dfd:0:::";

var queryDefinition = new QueryDefinition(
$"SELECT * FROM c ");

var partitionKey = new PartitionKeyBuilder().Add(shardId).Add(leaderboardId).Build();

int resultCount = await ExecuteQueryWithHPKAsync(container, queryDefinition, pk: partitionKey, feedRange: null);
Debug.Assert(resultCount == 10000);
}

static async Task FunctionalBug_NotFilteringByEpkAsync(Container container)
{
Console.WriteLine();
Console.WriteLine("FunctionalBug_NotFilteringByEpkAsync");
Console.WriteLine("-------------------------------------------");

var shardId = "0";
var leaderboardId = "095bceb0-af08-432b-adaf-3f81bfaf01ec:0:::";
//var leaderboardId = "0d09115f-12bd-4635-9889-b19bef9d7dfd:0:::";

var queryDefinition = new QueryDefinition(
$"SELECT * FROM c ");

var partitionKey = new PartitionKeyBuilder().Add(shardId).Add(leaderboardId).Build();

int resultCount = await ExecuteQueryWithHPKAsync(container, queryDefinition, pk: null, feedRange: FeedRange.FromPartitionKey(partitionKey));
Debug.Assert(resultCount == 10000);
}

static async Task<int> ExecuteQueryWithHPKAsync(
Container container,
QueryDefinition queryDefinition,
PartitionKey? pk,
#nullable enable
FeedRange? feedRange)
#nullable disable
{
var requestOptions = new QueryRequestOptions();
if (pk != null)
{
requestOptions.PartitionKey = pk;
}

List<JObject> results = new List<JObject>();
using var iterator = container.GetItemQueryIterator<JObject>(feedRange, queryDefinition, continuationToken: null, requestOptions: requestOptions);
int i = 1;
while (iterator.HasMoreResults)
{
var response = await iterator.ReadNextAsync();

Console.WriteLine($"Page {i}: PKRangeId : {response.Headers["x-ms-cosmos-physical-partition-id"]}");
Console.WriteLine($"Page {i}: ActivityId : {response.ActivityId}");
Console.WriteLine($"Page {i}: Total documents for Partition Key : {response.Count()}");
Console.WriteLine($"Page {i}: Request charge: {response.RequestCharge}");
results.AddRange(response.Resource);
i++;
}

return results.Count;
}
}
}
Loading