Skip to content

Commit

Permalink
Group subscription updates (#4422)
Browse files Browse the repository at this point in the history
<!-- Link the GitHub or AzDO issue this pull request is associated with.
Please copy and paste the full URL rather than using the
dotnet/arcade-services# syntax -->
#4336
This PR makes the service group dependency updates by keeping track of
the commit that was in the latest subscription trigger.
For example, let's say we trigger a subscription (we'll call it trigger
1) that has an already existing PR, but that PR can't be updated. We set
a reminder to try again later. Now let's say we trigger the subscription
again (trigger 2), again, but the PR is still not updatable.
This PR makes it so that the update caused by the trigger 1 doesn't get
processed, because a newer update from trigger 2 should be processed
next

---------

Co-authored-by: Přemek Vysoký <[email protected]>
  • Loading branch information
dkurepa and premun authored Feb 10, 2025
1 parent 7795021 commit 5ffc662
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ Task<bool> CheckPullRequestAsync(
PullRequestCheck pullRequestCheck);

Task<bool> ProcessPendingUpdatesAsync(
SubscriptionUpdateWorkItem update);
SubscriptionUpdateWorkItem update,
bool forceApply);

Task<bool> UpdateAssetsAsync(
Guid subscriptionId,
SubscriptionType type,
int buildId,
string sourceRepo,
string sourceSha,
List<Asset> assets);
List<Asset> assets,
bool forceApply);

PullRequestUpdaterId Id { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class InProgressPullRequest : DependencyFlowWorkItem, IPullRequest

[DataMember]
public InProgressPullRequestState MergeState { get; set; }

[DataMember]
public Dictionary<Guid, int> NextBuildsToProcess { get; set; } = [];
}

public enum InProgressPullRequestState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,37 @@ public async Task<bool> UpdateAssetsAsync(
int buildId,
string sourceRepo,
string sourceSha,
List<Asset> assets)
List<Asset> assets,
bool forceApply)
{
return await ProcessPendingUpdatesAsync(new()
{
UpdaterId = Id.ToString(),
SubscriptionId = subscriptionId,
SubscriptionType = type,
BuildId = buildId,
SourceSha = sourceSha,
SourceRepo = sourceRepo,
Assets = assets,
IsCoherencyUpdate = false,
});
return await ProcessPendingUpdatesAsync(
new()
{
UpdaterId = Id.ToString(),
SubscriptionId = subscriptionId,
SubscriptionType = type,
BuildId = buildId,
SourceSha = sourceSha,
SourceRepo = sourceRepo,
Assets = assets,
IsCoherencyUpdate = false,
},
forceApply);
}

/// <summary>
/// Process any pending pull request updates.
/// </summary>
/// <param name="forceApply">If false, we will check if this build is the latest one we have queued. If it's not we will skip this update.</param>
/// <returns>
/// True if updates have been applied; <see langword="false" /> otherwise.
/// </returns>
public async Task<bool> ProcessPendingUpdatesAsync(SubscriptionUpdateWorkItem update)
public async Task<bool> ProcessPendingUpdatesAsync(SubscriptionUpdateWorkItem update, bool forceApply)
{
_logger.LogInformation("Processing pending updates for subscription {subscriptionId}", update.SubscriptionId);
// Check if we track an on-going PR already
InProgressPullRequest? pr = await _pullRequestState.TryGetStateAsync();

bool isCodeFlow = update.SubscriptionType == SubscriptionType.DependenciesAndSources;

if (pr == null)
Expand All @@ -149,6 +154,18 @@ public async Task<bool> ProcessPendingUpdatesAsync(SubscriptionUpdateWorkItem up
}
else
{
if (!forceApply &&
pr.NextBuildsToProcess != null &&
pr.NextBuildsToProcess.TryGetValue(update.SubscriptionId, out int buildId) &&
buildId != update.BuildId)
{
_logger.LogInformation("Skipping update for subscription {subscriptionId} with build {oldBuild} because an update with a newer build {newBuild} has already been queued.",
update.SubscriptionId,
update.BuildId,
pr.NextBuildsToProcess);
return true;
}

var prStatus = await GetPullRequestStatusAsync(pr, isCodeFlow);
switch (prStatus)
{
Expand All @@ -161,9 +178,7 @@ public async Task<bool> ProcessPendingUpdatesAsync(SubscriptionUpdateWorkItem up
// If we can update it, we will do it below
break;
case PullRequestStatus.InProgressCannotUpdate:
_logger.LogInformation("PR {url} for subscription {subscriptionId} cannot be updated at this time. Deferring update..", pr.Url, update.SubscriptionId);
await _pullRequestUpdateReminders.SetReminderAsync(update, DefaultReminderDelay, isCodeFlow);
await _pullRequestCheckReminders.UnsetReminderAsync(isCodeFlow);
await ScheduleUpdateForLater(pr, update, isCodeFlow);
return false;
default:
throw new NotImplementedException($"Unknown PR status {prStatus}");
Expand Down Expand Up @@ -636,6 +651,7 @@ await AddDependencyFlowEventsAsync(

await darcRemote.UpdatePullRequestAsync(pr.Url, pullRequest);
pr.LastUpdate = DateTime.UtcNow;
pr.NextBuildsToProcess.Remove(update.SubscriptionId);
await SetPullRequestCheckReminder(pr, isCodeFlow: update.SubscriptionType == SubscriptionType.DependenciesAndSources);

_logger.LogInformation("Pull request '{prUrl}' updated", pr.Url);
Expand Down Expand Up @@ -866,6 +882,15 @@ private async Task ClearAllStateAsync(bool isCodeFlow, bool clearPendingUpdates)
return alteredUpdates;
}

private async Task ScheduleUpdateForLater(InProgressPullRequest pr, SubscriptionUpdateWorkItem update, bool isCodeFlow)
{
_logger.LogInformation("PR {url} for subscription {subscriptionId} cannot be updated at this time. Deferring update..", pr.Url, update.SubscriptionId);
await _pullRequestUpdateReminders.SetReminderAsync(update, DefaultReminderDelay, isCodeFlow);
await _pullRequestCheckReminders.UnsetReminderAsync(isCodeFlow);
pr.NextBuildsToProcess[update.SubscriptionId] = update.BuildId;
await _pullRequestState.SetAsync(pr);
}

#region Code flow subscriptions

/// <summary>
Expand Down Expand Up @@ -1242,6 +1267,7 @@ private async Task<bool> HandlePrUpdateConflictAsync(

pr.MergeState = InProgressPullRequestState.Conflict;
pr.SourceSha = remoteCommit;
pr.NextBuildsToProcess[update.SubscriptionId] = update.BuildId;
await _pullRequestState.SetAsync(pr);
await _pullRequestUpdateReminders.SetReminderAsync(update, DefaultReminderDelay, isCodeFlow: true);
await _pullRequestCheckReminders.UnsetReminderAsync(isCodeFlow: true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ await pullRequestUpdater.UpdateAssetsAsync(
build.Id,
build.GetRepository(),
build.Commit,
assets);
assets,
forceApply: true);

_logger.LogInformation("Asset update complete for {subscriptionId}", _subscriptionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public override async Task<bool> ProcessWorkItemAsync(
CancellationToken cancellationToken)
{
var updater = _updaterFactory.CreatePullRequestUpdater(PullRequestUpdaterId.Parse(workItem.UpdaterId));
await updater.ProcessPendingUpdatesAsync(workItem);
await updater.ProcessPendingUpdatesAsync(workItem, forceApply: false);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public async Task PendingCodeFlowUpdatesNotUpdatablePr()
using (WithExistingCodeFlowPullRequest(build, canUpdate: false))
{
await WhenProcessPendingUpdatesAsyncIsCalled(build, isCodeFlow: true);
AndShouldHaveInProgressPullRequestState(build);
AndShouldHaveInProgressPullRequestState(build, build.Id);
}
}

Expand Down Expand Up @@ -100,6 +100,7 @@ public async Task PendingUpdatesInConflictWithCurrent()
AndShouldNotHavePullRequestCheckReminder();
AndShouldHaveInProgressPullRequestState(
oldBuild,
nextBuildToProcess: newBuild.Id,
overwriteBuildCommit: ConflictPRRemoteSha,
prState: InProgressPullRequestState.Conflict);
}
Expand All @@ -125,6 +126,7 @@ public async Task PendingUpdateNotUpdatablePrAlreadyInConflict()
AndShouldNotHavePullRequestCheckReminder();
AndShouldHaveInProgressPullRequestState(
build,
nextBuildToProcess: build.Id,
overwriteBuildCommit: ConflictPRRemoteSha,
prState: InProgressPullRequestState.Conflict);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ namespace ProductConstructionService.DependencyFlow.Tests;

internal abstract class PendingUpdatePullRequestUpdaterTests : PullRequestUpdaterTests
{
protected async Task WhenProcessPendingUpdatesAsyncIsCalled(Build forBuild, bool isCodeFlow = false)
protected async Task WhenProcessPendingUpdatesAsyncIsCalled(
Build forBuild,
bool isCodeFlow = false,
bool forceApply = true)
{
await Execute(
async context =>
{
IPullRequestUpdater updater = CreatePullRequestActor(context);
await updater.ProcessPendingUpdatesAsync(CreateSubscriptionUpdate(forBuild, isCodeFlow));
await updater.ProcessPendingUpdatesAsync(CreateSubscriptionUpdate(forBuild, isCodeFlow), forceApply);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,80 @@ public async Task PendingUpdatesUpdatablePr()
AndShouldHavePullRequestCheckReminder();
}
}

[Test]
public async Task PendingUpdatesNotUpdatableGroupingTest()
{
GivenATestChannel();
GivenASubscription(
new SubscriptionPolicy
{
Batchable = true,
UpdateFrequency = UpdateFrequency.EveryBuild
});
Build b1 = GivenANewBuild(true);
Build b2 = GivenANewBuild(true);
b2.Id = 2;

using (WithExistingPullRequest(b1, canUpdate: false))
{
await WhenProcessPendingUpdatesAsyncIsCalled(b2);

ThenShouldHaveInProgressPullRequestState(b1, b2.Id);
ThenShouldHavePendingUpdateState(b2, isCodeFlow: false);
AndShouldNotHavePullRequestCheckReminder();
}
}

[Test]
public async Task PendingUpdatesShouldNotBeProcessedUnlessNewerBuildQueued()
{
GivenATestChannel();
GivenASubscription(
new SubscriptionPolicy
{
Batchable = true,
UpdateFrequency = UpdateFrequency.EveryBuild
});
Build b1 = GivenANewBuild(true);
Build b2 = GivenANewBuild(true);
b2.Id = 2;
Build b3 = GivenANewBuild(true);
b3.Id = 3;

using (WithExistingPullRequest(b1, canUpdate: true, nextBuildToProcess: b3.Id, setupRemoteMock: false))
{
await WhenProcessPendingUpdatesAsyncIsCalled(b2, forceApply: false);

ThenShouldHaveInProgressPullRequestState(b1, b3.Id);
AndShouldHaveNoPendingUpdateState();
AndShouldNotHavePullRequestCheckReminder();
}
}

[Test]
public async Task PendingUpdatesShouldBeProcessedWhenNewestBuildPending()
{
GivenATestChannel();
GivenASubscription(
new SubscriptionPolicy
{
Batchable = true,
UpdateFrequency = UpdateFrequency.EveryBuild
});
Build b1 = GivenANewBuild(true);
Build b2 = GivenANewBuild(true);
b2.Id = 2;

WithRequireNonCoherencyUpdates();
WithNoRequiredCoherencyUpdates();
using (WithExistingPullRequest(b1, canUpdate: true, nextBuildToProcess: b2.Id, setupRemoteMock: true))
{
await WhenProcessPendingUpdatesAsyncIsCalled(b2, forceApply: false);

ThenShouldHaveInProgressPullRequestState(b2);
AndShouldHaveNoPendingUpdateState();
AndShouldHavePullRequestCheckReminder();
}
}
}
Loading

0 comments on commit 5ffc662

Please sign in to comment.