Skip to content

Commit

Permalink
ExtractChagneFeedContinuations (#42156)
Browse files Browse the repository at this point in the history
* add function to allow extract continuationTokens

---------

Co-authored-by: annie-mac <[email protected]>
  • Loading branch information
xinlian12 and annie-mac authored Oct 8, 2024
1 parent df338bf commit fd14aab
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*
*/

package com.azure.cosmos;

import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.util.CosmosChangeFeedContinuationTokenUtils;
import com.fasterxml.jackson.databind.JsonNode;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

public class ChangeFeedContinuationTokenUtilsTests extends TestSuiteBase {
private CosmosAsyncClient client;
private CosmosAsyncDatabase createdDatabase;

@Factory(dataProvider = "simpleClientBuildersWithDirect")
public ChangeFeedContinuationTokenUtilsTests(CosmosClientBuilder clientBuilder) {
super(clientBuilder);
}

@BeforeClass(groups = { "emulator" }, timeOut = SETUP_TIMEOUT)
public void before_CosmosContainerTest() {
client = getClientBuilder().buildAsyncClient();
createdDatabase = getSharedCosmosDatabase(client);
}

@Test(groups = { "emulator" }, timeOut = 2 * TIMEOUT)
public void extractContinuationTokens() {
// create a container with at least 3 partitions
String testContainerId = UUID.randomUUID().toString();
CosmosContainerProperties containerProperties =
new CosmosContainerProperties(testContainerId, "/mypk");

try {
CosmosAsyncContainer testContainer =
createCollection(this.createdDatabase, containerProperties, new CosmosContainerRequestOptions(), 18000);

List<FeedRange> feedRanges = testContainer.getFeedRanges().block();
assertThat(feedRanges.size()).isEqualTo(3);

// create few items into the container
for (int i = 0; i < 10; i++) {
testContainer.createItem(TestItem.createNewItem(UUID.randomUUID().toString())).block();
}

// validate items persisted on each feedRange
Map<FeedRange, List<String>> pkValuesByFeedRange = new ConcurrentHashMap<>();
for (FeedRange feedRange : feedRanges) {
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setFeedRange(feedRange);
List<String> pkValues = testContainer.readAllItems(cosmosQueryRequestOptions, TestItem.class)
.map(TestItem::getMypk)
.collectList()
.block();
assertThat(pkValues.size()).isGreaterThan(0);
pkValuesByFeedRange.put(feedRange, pkValues);
}

// do initial query change feed
AtomicReference<String> continuationToken = new AtomicReference<>();
testContainer
.queryChangeFeed(
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(
FeedRange.forFullRange()),
JsonNode.class)
.byPage()
.doOnNext(response -> {
continuationToken.set(response.getContinuationToken());
})
.blockLast();
assertThat(continuationToken.get()).isNotEmpty();
ChangeFeedState changeFeedState = ChangeFeedState.fromString(continuationToken.get());
assertThat(changeFeedState).isInstanceOf(ChangeFeedStateV1.class);
ChangeFeedStateV1 changeFeedStateV1 = (ChangeFeedStateV1) changeFeedState;
assertThat(changeFeedStateV1.getContinuation().getContinuationTokenCount()).isEqualTo(3);

// create few more items on each feed range
List<String> expectedNewItems = new ArrayList<>();
for (FeedRange feedRange : pkValuesByFeedRange.keySet()) {
List<String> pkValues = pkValuesByFeedRange.get(feedRange);
for (int i = 0; i < 5; i++) {
TestItem testItem = TestItem.createNewItem(pkValues.get(0));
testContainer.createItem(testItem).block();
expectedNewItems.add(testItem.getId());
}
}

// extract continuation tokens and make sure we can still use the new continuation token to read back all items
List<Integer> expectedContinuationTokenCounts = Arrays.asList(null, -1, 0, 1, 2, 3, 4);
for (Integer expectedContinuationTokenCount : expectedContinuationTokenCounts) {
Map<FeedRange, String> extractedTokens =
CosmosChangeFeedContinuationTokenUtils.extractContinuationTokens(continuationToken.get(), expectedContinuationTokenCount);

if (expectedContinuationTokenCount == null ||
expectedContinuationTokenCount <= 0 ||
expectedContinuationTokenCount > 3) {
assertThat(extractedTokens.size()).isEqualTo(3);
} else {
assertThat(extractedTokens.size()).isEqualTo(expectedContinuationTokenCount);
}

List<String> fetchedItems = new ArrayList<>();
for (FeedRange feedRange : extractedTokens.keySet()) {
CosmosChangeFeedRequestOptions changeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(extractedTokens.get(feedRange));

testContainer.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.doOnNext(response -> {
fetchedItems.addAll(
response
.getResults()
.stream()
.map(TestItem::getId)
.collect(Collectors.toList()));
})
.blockLast();
}

assertThat(fetchedItems.size()).isEqualTo(expectedNewItems.size());
assertThat(fetchedItems.containsAll(expectedNewItems)).isTrue();
}
} finally {
this.createdDatabase
.getContainer(testContainerId)
.delete()
.onErrorResume(throwable -> {
logger.warn("Failed to delete container {}", testContainerId, throwable);
return Mono.empty();
})
.block();
}
}

@AfterClass(groups = { "emulator" }, timeOut = 3 * SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
logger.info("starting ....");
safeCloseAsync(this.client);
}

private static class TestItem {
private String id;
private String mypk;
private String prop;

public TestItem(){}

public TestItem(String id, String mypk, String prop) {
this.id = id;
this.mypk = mypk;
this.prop = prop;
}

public static TestItem createNewItem(String pkValue) {
return new TestItem(UUID.randomUUID().toString(), pkValue, UUID.randomUUID().toString());
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getMypk() {
return mypk;
}

public void setMypk(String mypk) {
this.mypk = mypk;
}

public String getProp() {
return prop;
}

public void setProp(String prop) {
this.prop = prop;
}
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Features Added
* Added an API to retrieve diagnostics from the change feed processor context. - See [PR 41738](https://github.com/Azure/azure-sdk-for-java/pull/41738)
* Added support to allow `queryChangeFeed` to complete when all changes available when the query starts have been fetched. - See [PR 42160](https://github.com/Azure/azure-sdk-for-java/pull/42160)
* Added an utility API to help extract sub-range continuation tokens from existing changeFeed query continuation token. - See [PR 42156](https://github.com/Azure/azure-sdk-for-java/pull/42156)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.util;

import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.FeedRange;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public final class CosmosChangeFeedContinuationTokenUtils {
private CosmosChangeFeedContinuationTokenUtils() {}

/***
* Utility method to help extract continuation tokens for sub-feedRange
* @param changeFeedContinuationToken the original change feed continuation token being returned from queryChangeFeed.
* @return a map of sub-feedRange to its mapping continuation token string
*/
public static Map<FeedRange, String> extractContinuationTokens(String changeFeedContinuationToken) {
return extractContinuationTokens(changeFeedContinuationToken, -1);
}

/***
* Utility method to help extract continuation tokens for sub-range
* @param changeFeedContinuationToken the original change feed continuation token being returned from queryChangeFeed.
* @param targetedContinuationCount the targeted continuation token count.
* Max will be capped by the count of sub-feedRanges included in the continuation token.
* Using -1 to extract continuations for each sub-feedRanges.
* Using null will be same as using -1.
* @return a map of sub-feedRange to its mapping continuation token string
*/
public static Map<FeedRange, String> extractContinuationTokens(
String changeFeedContinuationToken,
Integer targetedContinuationCount) {

checkNotNull(changeFeedContinuationToken, "Argument 'changeFeedContinuationToken' cannot be null.");
if (targetedContinuationCount == null) {
targetedContinuationCount = -1;
}

final ChangeFeedState changeFeedState = ChangeFeedState.fromString(changeFeedContinuationToken);
List<CompositeContinuationToken> allTokens = changeFeedState.extractContinuationTokens();
allTokens.sort(new Comparator<CompositeContinuationToken>() {
@Override
public int compare(CompositeContinuationToken o1, CompositeContinuationToken o2) {
return o1.getRange().getMin().compareTo(o2.getRange().getMin());
}
});

Map<FeedRange, String> extractedContinuationTokenMap = new ConcurrentHashMap<>();
int effectiveTargetedContinuationCount =
targetedContinuationCount <= 0 ? allTokens.size() : Math.min(targetedContinuationCount, allTokens.size());
List<List<CompositeContinuationToken>> segmentedTokens =
getSegmentedTokens(allTokens, effectiveTargetedContinuationCount);
for (List<CompositeContinuationToken> segmentedToken : segmentedTokens) {
FeedRangeEpkImpl effectiveChildRange =
new FeedRangeEpkImpl(
new Range<>(
segmentedToken.get(0).getRange().getMin(),
segmentedToken.get(segmentedToken.size()-1).getRange().getMax(),
segmentedToken.get(0).getRange().isMinInclusive(),
segmentedToken.get(segmentedToken.size()-1).getRange().isMaxInclusive()));

ChangeFeedState newChildFeedRangeState = new ChangeFeedStateV1(
changeFeedState.getContainerRid(),
effectiveChildRange,
changeFeedState.getMode(),
changeFeedState.getStartFromSettings(),
FeedRangeContinuation.create(
changeFeedState.getContainerRid(),
effectiveChildRange,
segmentedToken
)
);

extractedContinuationTokenMap.put(effectiveChildRange, newChildFeedRangeState.toString());
}

return extractedContinuationTokenMap;
}

private static List<List<CompositeContinuationToken>> getSegmentedTokens(
List<CompositeContinuationToken> allTokens,
int targetedContinuationCount) {

List<List<CompositeContinuationToken>> segmentedTokens = new ArrayList<>();
int subListMinSize = allTokens.size() / targetedContinuationCount;
int remainingSize = allTokens.size() % targetedContinuationCount;

int subListStartIndex = 0;
for (int i = 1; i <= targetedContinuationCount; i++) {
int subListEndIndex = subListStartIndex + subListMinSize + (remainingSize > 0 ? 1 : 0);
segmentedTokens.add(new ArrayList<>(allTokens.subList(subListStartIndex, subListEndIndex)));
subListStartIndex = subListEndIndex;
remainingSize--;
}

return segmentedTokens;
}
}

0 comments on commit fd14aab

Please sign in to comment.