Skip to content

(#383) Capture the local exceptions in a pull operation. #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions src/CommunityToolkit.Datasync.Client/Offline/Models/PullResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using CommunityToolkit.Datasync.Client.Serialization;
using System.Collections.Concurrent;
using System.Collections.Immutable;

Expand All @@ -14,11 +15,12 @@ public class PullResult
{
private int _additions, _deletions, _replacements;
private readonly ConcurrentDictionary<Uri, ServiceResponse> _failedRequests = new();
private readonly ConcurrentDictionary<string, Exception> _localExceptions = new();

/// <summary>
/// Determines if the pull result was completely successful.
/// </summary>
public bool IsSuccessful { get => this._failedRequests.IsEmpty; }
public bool IsSuccessful { get => this._failedRequests.IsEmpty && this._localExceptions.IsEmpty; }

/// <summary>
/// The total count of operations performed on this pull operation.
Expand All @@ -41,13 +43,36 @@ public class PullResult
public int Replacements { get => this._replacements; }

/// <summary>
/// The list of failed requests.
/// The list of failed requests. The key is the request URI, and the value is
/// the <see cref="ServiceResponse"/> for that request.
/// </summary>
public IReadOnlyDictionary<Uri, ServiceResponse> FailedRequests { get => this._failedRequests.ToImmutableDictionary(); }

/// <summary>
/// The list of local exceptions. The key is the GUID of the entity that caused the exception,
/// and the value is the exception itself.
/// </summary>
public IReadOnlyDictionary<string, Exception> LocalExceptions { get => this._localExceptions.ToImmutableDictionary(); }

/// <summary>
/// Adds a failed request to the list of failed requests.
/// </summary>
/// <param name="requestUri">The request URI causing the failure.</param>
/// <param name="response">The response for the request.</param>
internal void AddFailedRequest(Uri requestUri, ServiceResponse response)
=> _ = this._failedRequests.TryAdd(requestUri, response);

/// <summary>
/// Adds a local exception to the list of local exceptions.
/// </summary>
/// <param name="entityMetadata">The entity metadata, or null if not available.</param>
/// <param name="exception">The exception that was thrown.</param>
internal void AddLocalException(EntityMetadata? entityMetadata, Exception exception)
{
string entityId = entityMetadata?.Id ?? $"NULL:{Guid.NewGuid():N}";
_ = this._localExceptions.TryAdd(entityId, exception);
}

internal void IncrementAdditions()
=> Interlocked.Increment(ref this._additions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,86 +54,108 @@

QueueHandler<PullResponse> databaseUpdateQueue = new(1, async pullResponse =>
{
if (pullResponse.Items.Any())
EntityMetadata? currentMetadata = null;

try
{
DateTimeOffset lastSynchronization = await DeltaTokenStore.GetDeltaTokenAsync(pullResponse.QueryId, cancellationToken).ConfigureAwait(false);
foreach (object item in pullResponse.Items)
if (pullResponse.Items.Any())
{
EntityMetadata metadata = EntityResolver.GetEntityMetadata(item, pullResponse.EntityType);
object? originalEntity = await context.FindAsync(pullResponse.EntityType, [metadata.Id], cancellationToken).ConfigureAwait(false);

if (originalEntity is null && !metadata.Deleted)
{
_ = context.Add(item);
result.IncrementAdditions();
}
else if (originalEntity is not null && metadata.Deleted)
DateTimeOffset lastSynchronization = await DeltaTokenStore.GetDeltaTokenAsync(pullResponse.QueryId, cancellationToken).ConfigureAwait(false);
foreach (object item in pullResponse.Items)
{
_ = context.Remove(originalEntity);
result.IncrementDeletions();
}
else if (originalEntity is not null && !metadata.Deleted)
{
// Gather properties marked with [JsonIgnore]
HashSet<string> ignoredProps = pullResponse.EntityType
.GetProperties(BindingFlags.Public | BindingFlags.Instance)
.Where(p => p.IsDefined(typeof(JsonIgnoreAttribute), inherit: true))
.Select(p => p.Name)
.ToHashSet();

EntityEntry originalEntry = context.Entry(originalEntity);
EntityEntry newEntry = context.Entry(item);
EntityMetadata metadata = EntityResolver.GetEntityMetadata(item, pullResponse.EntityType);
currentMetadata = metadata;
object? originalEntity = await context.FindAsync(pullResponse.EntityType, [metadata.Id], cancellationToken).ConfigureAwait(false);

// Only copy properties that are not marked with [JsonIgnore]
foreach (IProperty property in originalEntry.Metadata.GetProperties())
if (originalEntity is null && !metadata.Deleted)
{
_ = context.Add(item);
result.IncrementAdditions();
}
else if (originalEntity is not null && metadata.Deleted)
{
if (!ignoredProps.Contains(property.Name))
_ = context.Remove(originalEntity);
result.IncrementDeletions();
}
else if (originalEntity is not null && !metadata.Deleted)
{
// Gather properties marked with [JsonIgnore]
HashSet<string> ignoredProps = pullResponse.EntityType
.GetProperties(BindingFlags.Public | BindingFlags.Instance)
.Where(p => p.IsDefined(typeof(JsonIgnoreAttribute), inherit: true))
.Select(p => p.Name)
.ToHashSet();

EntityEntry originalEntry = context.Entry(originalEntity);
EntityEntry newEntry = context.Entry(item);

// Only copy properties that are not marked with [JsonIgnore]
foreach (IProperty property in originalEntry.Metadata.GetProperties())
{
originalEntry.Property(property.Name).CurrentValue = newEntry.Property(property.Name).CurrentValue;
if (!ignoredProps.Contains(property.Name))
{
originalEntry.Property(property.Name).CurrentValue = newEntry.Property(property.Name).CurrentValue;
}
}

result.IncrementReplacements();
}

result.IncrementReplacements();
if (metadata.UpdatedAt > lastSynchronization)
{
lastSynchronization = metadata.UpdatedAt.Value;
bool isAdded = await DeltaTokenStore.SetDeltaTokenAsync(pullResponse.QueryId, metadata.UpdatedAt.Value, cancellationToken).ConfigureAwait(false);
if (isAdded)
{
// Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
}
}

Check warning on line 113 in src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs

View workflow job for this annotation

GitHub Actions / build

Blank line required between block and subsequent statement (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide2003)

Check warning on line 113 in src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs

View workflow job for this annotation

GitHub Actions / build

Blank line required between block and subsequent statement (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide2003)
currentMetadata = null;
}

if (metadata.UpdatedAt > lastSynchronization)
if (pullOptions.SaveAfterEveryServiceRequest)
{
lastSynchronization = metadata.UpdatedAt.Value;
bool isAdded = await DeltaTokenStore.SetDeltaTokenAsync(pullResponse.QueryId, metadata.UpdatedAt.Value, cancellationToken).ConfigureAwait(false);
if (isAdded)
{
// Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
}
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
}
}

if (pullOptions.SaveAfterEveryServiceRequest)
{
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
context.SendSynchronizationEvent(new SynchronizationEventArgs()
{
EventType = SynchronizationEventType.ItemsCommitted,
EntityType = pullResponse.EntityType,
ItemsProcessed = pullResponse.TotalItemsProcessed,
ItemsTotal = pullResponse.TotalRequestItems,
QueryId = pullResponse.QueryId
});
}

context.SendSynchronizationEvent(new SynchronizationEventArgs()
if (pullResponse.Completed)
{
EventType = SynchronizationEventType.ItemsCommitted,
EntityType = pullResponse.EntityType,
ItemsProcessed = pullResponse.TotalItemsProcessed,
ItemsTotal = pullResponse.TotalRequestItems,
QueryId = pullResponse.QueryId
});
context.SendSynchronizationEvent(new SynchronizationEventArgs()
{
EventType = SynchronizationEventType.PullEnded,
EntityType = pullResponse.EntityType,
ItemsProcessed = pullResponse.TotalItemsProcessed,
ItemsTotal = pullResponse.TotalRequestItems,
QueryId = pullResponse.QueryId,
Exception = pullResponse.Exception,
ServiceResponse = pullResponse.Exception is DatasyncPullException ex ? ex.ServiceResponse : null
});
}
}

if (pullResponse.Completed)
catch (Exception ex)
{
// An exception is thrown in the local processing section of the pull operation. We can't
// handle it properly, so we add it to the result and send a synchronization event to allow
// the developer to capture the exception.
result.AddLocalException(currentMetadata, ex);
context.SendSynchronizationEvent(new SynchronizationEventArgs()
{
EventType = SynchronizationEventType.PullEnded,
EventType = SynchronizationEventType.LocalException,
EntityType = pullResponse.EntityType,
ItemsProcessed = pullResponse.TotalItemsProcessed,
ItemsTotal = pullResponse.TotalRequestItems,
QueryId = pullResponse.QueryId,
Exception = pullResponse.Exception,
ServiceResponse = pullResponse.Exception is DatasyncPullException ex ? ex.ServiceResponse : null
Exception = ex,
EntityMetadata = currentMetadata
});
}
});
Expand Down Expand Up @@ -189,6 +211,20 @@
result.AddFailedRequest(requestUri, ex.ServiceResponse);
databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, [], totalCount, itemsProcessed, true, ex));
}
catch (Exception localex)
{
// An exception is thrown that is locally generated. We can't handle it properly, so we
// add it to the result and send a synchronization event to allow the developer to capture
// the exception.
result.AddLocalException(null, localex);
context.SendSynchronizationEvent(new SynchronizationEventArgs()
{
EventType = SynchronizationEventType.LocalException,
EntityType = pullRequest.EntityType,
QueryId = pullRequest.QueryId,
Exception = localex
});
}
});

// Get requests we need to enqueue. Note : do not enqueue them yet. Context only supports one outstanding query at a time and we don't want a query from a background task being run concurrently with GetDeltaTokenAsync.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using CommunityToolkit.Datasync.Client.Serialization;

namespace CommunityToolkit.Datasync.Client.Offline;

/// <summary>
Expand All @@ -14,6 +16,7 @@ public enum SynchronizationEventType
/// </summary>
/// <remarks><see cref="SynchronizationEventArgs.ItemsProcessed"/> is not yet known here</remarks>
PullStarted,

/// <summary>
/// Occurs when items have been successfully fetched from the server.
/// </summary>
Expand All @@ -30,18 +33,26 @@ public enum SynchronizationEventType
/// Pull for the given entity ended.
/// </summary>
PullEnded,

/// <summary>
/// Push operation started.
/// </summary>
PushStarted,

/// <summary>
/// An item was pushed to the server
/// </summary>
PushItem,

/// <summary>
/// Push operation ended.
/// </summary>
PushEnded,

/// <summary>
/// A local exception was thrown during pull operations.
/// </summary>
LocalException,
}

/// <summary>
Expand Down Expand Up @@ -94,4 +105,9 @@ public class SynchronizationEventArgs
/// The operation that was executed. Not used on pull events.
/// </summary>
public DatasyncOperation? PushOperation { get; init; }

/// <summary>
/// The local metadata of the entity being modified during local exceptions.
/// </summary>
public EntityMetadata? EntityMetadata { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace CommunityToolkit.Datasync.Client.Serialization;
/// <summary>
/// A representation of just the metadata for an entity.
/// </summary>
internal class EntityMetadata
public class EntityMetadata
{
/// <summary>
/// The globally unique ID of the entity.
Expand Down