Skip to content

Commit

Permalink
added tracing for entities
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophia Tevosyan committed Mar 6, 2025
1 parent 7cfd59a commit f76b7d5
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,17 @@ private async Task SignalEntityAsyncInternal(DurableClient durableClient, string
var guid = Guid.NewGuid(); // unique id for this request
var instanceId = EntityId.GetSchedulerIdFromEntityId(entityId);
var instance = new OrchestrationInstance() { InstanceId = instanceId };

using var signalEntityActivity = TraceHelper.StartActivityForCallingOrSignalingEntity(instanceId, entityId.EntityName, operationName, true, Activity.Current?.Context);

var request = new RequestMessage()
{
ParentInstanceId = null, // means this was sent by a client
ParentExecutionId = null,
ParentTraceId = signalEntityActivity.TraceId.ToString(),
ParentSpanId = signalEntityActivity.SpanId.ToString(),
ParentTraceFlags = signalEntityActivity.ActivityTraceFlags,
ParentTraceState = signalEntityActivity.TraceStateString,
Id = guid,
IsSignal = true,
Operation = operationName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Runtime.ExceptionServices;
Expand Down Expand Up @@ -424,10 +425,16 @@ private void SignalEntityInternal(EntityId entity, DateTime? scheduledTimeUtc, s
{
InstanceId = EntityId.GetSchedulerIdFromEntityId(entity),
};

using var signalEntityActivity = TraceHelper.StartActivityForCallingOrSignalingEntity(target.InstanceId, entity.EntityName, operation, true, Activity.Current?.Context, this.InstanceId);
var request = new RequestMessage()
{
ParentInstanceId = this.InstanceId,
ParentExecutionId = null, // for entities, message sorter persists across executions
ParentTraceId = signalEntityActivity.TraceId.ToString(),
ParentSpanId = signalEntityActivity.SpanId.ToString(),
ParentTraceFlags = signalEntityActivity.ActivityTraceFlags,
ParentTraceState = signalEntityActivity.TraceStateString,
Id = Guid.NewGuid(),
IsSignal = true,
Operation = operation,
Expand All @@ -437,7 +444,7 @@ private void SignalEntityInternal(EntityId entity, DateTime? scheduledTimeUtc, s
{
request.SetInput(input, this.messageDataConverter);
}


this.SendOperationMessage(target, request);

this.Config.TraceHelper.FunctionScheduled(
Expand All @@ -462,13 +469,19 @@ string IDurableEntityContext.StartNewOrchestration(string functionName, object i
throw new ArgumentException(nameof(instanceId), "Orchestration instance ids must not start with @");
}

using var startOrchestrationActivity = TraceHelper.StartActivityForEntityStartingAnOrchestration(EntityId.GetSchedulerIdFromEntityId(this.self), instanceId, Activity.Current?.Context);

lock (this.outbox)
{
this.outbox.Add(new FireAndForgetMessage()
{
InstanceId = instanceId,
FunctionName = functionName,
Input = input,
ParentTraceId = startOrchestrationActivity.TraceId.ToString(),
ParentSpanId = startOrchestrationActivity.SpanId.ToString(),
ParentTraceFlags = startOrchestrationActivity.ActivityTraceFlags,
ParentTraceState = startOrchestrationActivity.TraceStateString,
});
}

Expand Down Expand Up @@ -709,7 +722,11 @@ internal void SendOutbox(OrchestrationContext innerContext, bool writeBackSucces
DurableOrchestrationContext.DefaultVersion,
fireAndForgetMessage.InstanceId,
fireAndForgetMessage.Input,
new Dictionary<string, string>() { { OrchestrationTags.FireAndForget, "" } });
new Dictionary<string, string>() { { OrchestrationTags.FireAndForget, "" } },
fireAndForgetMessage.ParentTraceId,
fireAndForgetMessage.ParentSpanId,
fireAndForgetMessage.ParentTraceFlags,
fireAndForgetMessage.ParentTraceState);

System.Diagnostics.Debug.Assert(dummyTask.IsCompleted, "task should be fire-and-forget");
}
Expand Down Expand Up @@ -776,6 +793,14 @@ private class FireAndForgetMessage : OutgoingMessage
public string FunctionName { get; set; }

public object Input { get; set; }

public string ParentTraceId { get; set; }

public string ParentSpanId { get; set; }

public ActivityTraceFlags ParentTraceFlags { get; set; }

public string ParentTraceState { get; set; }
}

private class OperationMessage : OutgoingMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
Expand Down Expand Up @@ -690,11 +691,22 @@ internal async Task<TResult> CallDurableTaskFunctionAsync<TResult>(
request.SetInput(input, this.messageDataConverter);
}

this.SendEntityMessage(target, request);

if (!oneWay)
using (var callOrSignalEntityActivity = !this.IsReplaying ? TraceHelper.StartActivityForCallingOrSignalingEntity(instanceId, EntityId.GetEntityIdFromSchedulerId(instanceId).EntityName, operation, oneWay, Activity.Current?.Context) : null)
{
callTask = this.WaitForEntityResponse<TResult>(guid, lockToUse);
if (callOrSignalEntityActivity != null)
{
request.ParentTraceId = callOrSignalEntityActivity.TraceId.ToString();
request.ParentSpanId = callOrSignalEntityActivity.SpanId.ToString();
request.ParentTraceFlags = callOrSignalEntityActivity.ActivityTraceFlags;
request.ParentTraceState = callOrSignalEntityActivity.TraceStateString;
}

this.SendEntityMessage(target, request);

if (!oneWay)
{
callTask = this.WaitForEntityResponse<TResult>(guid, lockToUse);
}
}

break;
Expand Down
16 changes: 16 additions & 0 deletions src/WebJobs.Extensions.DurableTask/Correlation/Schema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ internal static class Task
internal const string FireAt = "durabletask.fire_at";
}

// Should these have "entity" prefixes?
internal static class Entity
{
internal const string Type = Task.Type;
internal const string EntityId = "durabletask.entity.entity_id";
internal const string TargetInstanceId = "durabletask.entity.target_instance_id";
internal const string TargetEntityId = "durabletask.entity.target_entity_id";
internal const string EntityOperation = "durabletask.entity.entity_operation";
}

internal static class Status
{
internal const string Code = "otel.status_code";
Expand All @@ -27,6 +37,12 @@ internal static class Status

internal static class SpanNames
{
internal static string CallOrSignalEntity(string name, string operation)
=> $"{TraceActivityConstants.Entity}:{name}:{operation}";

internal static string EntityStartsAnOrchestration()
=> $"{TraceActivityConstants.Entity}:{TraceActivityConstants.CreateOrchestration}";

internal static string CreateOrchestration(string name, string? version)
=> FormatName(TraceActivityConstants.CreateOrchestration, name, version);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation
{
internal class TraceActivityConstants
Expand All @@ -10,8 +12,12 @@ internal class TraceActivityConstants
public const string Activity = "activity";
public const string Event = "event";
public const string Timer = "timer";
public const string Entity = "entity";

public const string CreateOrchestration = "create_orchestration";
public const string OrchestrationEvent = "orchestration_event";

public const string CallEntity = "CallEntity";
public const string SignalEntity = "SignalEntity";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Diagnostics;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

Expand Down Expand Up @@ -78,7 +79,31 @@ internal class RequestMessage
/// For lock requests involving multiple locks, the message number.
/// </summary>
[JsonProperty(PropertyName = "pos", DefaultValueHandling = DefaultValueHandling.Ignore)]
public int Position { get; set; }
public int Position { get; set; }

/// <summary>
/// The parent trace that called this operation.
/// </summary>
[JsonProperty(PropertyName = "parentTrace", DefaultValueHandling = DefaultValueHandling.Ignore)]
public string ParentTraceId { get; set; }

/// <summary>
/// The parent span that called this operation.
/// </summary>
[JsonProperty(PropertyName = "parentSpan", DefaultValueHandling = DefaultValueHandling.Ignore)]
public string ParentSpanId { get; set; }

/// <summary>
/// The trace flags of the parent that called this operation.
/// </summary>
[JsonProperty(PropertyName = "parentTraceFlags", DefaultValueHandling = DefaultValueHandling.Ignore)]
public ActivityTraceFlags ParentTraceFlags { get; set; }

/// <summary>
/// The trace state of the parent that called this operation.
/// </summary>
[JsonProperty(PropertyName = "parentTraceState", DefaultValueHandling = DefaultValueHandling.Ignore)]
public string ParentTraceState { get; set; }

[JsonIgnore]
public bool IsLockRequest => this.LockSet != null;
Expand Down
Loading

0 comments on commit f76b7d5

Please sign in to comment.