Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public Camera VideoSceneInput

#endregion

public string SessionId { get; private set; }
public string SessionId { get; private set; } = "(empty)";

public RtcSession(SfuWebSocket sfuWebSocket, Func<IStreamCall, HttpClient> httpClientFactory, ILogs logs,
ISerializer serializer, ITimeService timeService,
Expand Down
31 changes: 20 additions & 11 deletions Packages/StreamVideo/Runtime/Core/Models/CallSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
using StreamVideo.Core.State;
using StreamVideo.Core.State.Caches;
using StreamVideo.Core.StatefulModels;
using StreamVideo.Core.Utils;
using SfuCallState = StreamVideo.v1.Sfu.Models.CallState;
using SfuParticipant = StreamVideo.v1.Sfu.Models.Participant;
using SfuParticipantCount = StreamVideo.v1.Sfu.Models.ParticipantCount;

namespace StreamVideo.Core.Models
Expand Down Expand Up @@ -56,14 +54,18 @@ void IStateLoadableFrom<CallSessionResponseInternalDTO, CallSession>.LoadFromDto

// CallSessionResponseInternalDTO usually (or always?) contains no participants. Participants are updated from the SFU join response
// But SFU response can arrive before API response, so we can't override participants here because this clears the list
foreach (var dtoParticipant in dto.Participants)
{
var participant = cache.TryCreateOrUpdate(dtoParticipant);
if (!_participants.Contains(participant))
{
_participants.Add(participant);
}
}

//StreamTODO: temp remove this. This seems to be only messing up the participants list. We're testing updating the participants only based on SFU data.
// But we need to check how this will work with GetCall where there's not SFU connection

// foreach (var dtoParticipant in dto.Participants)
// {
// var participant = cache.TryCreateOrUpdate(dtoParticipant);
// if (!_participants.Contains(participant))
// {
// _participants.Add(participant);
// }
// }

// StreamTODO: figure out how to best handle this. Should we update it from coordinator or only the SFU
//_participantsCountByRole.TryReplaceValuesFromDto(dto.ParticipantsCountByRole);
Expand All @@ -82,6 +84,9 @@ void IStateLoadableFrom<SfuCallState, CallSession>.LoadFromDto(SfuCallState dto,
StartedAt = dto.StartedAt.ToDateTimeOffset();
}

// Treat SFU as the most updated source of truth for participants
_participants.Clear();

// dto.CallState.Participants may not contain all participants
foreach (var dtoParticipant in dto.Participants)
{
Expand Down Expand Up @@ -117,7 +122,11 @@ internal void UpdateFromSfu(HealthCheckResponse healthCheckResponse, ICache cach
internal (string sessionId, string userId) UpdateFromSfu(ParticipantLeft participantLeft, ICache cache)
{
var participant = cache.TryCreateOrUpdate(participantLeft.Participant);
_participants.Remove(participant);

if (!participant.IsLocalParticipant)
{
_participants.Remove(participant);
}

return (participantLeft.Participant.SessionId, participantLeft.Participant.UserId);
}
Expand Down
122 changes: 92 additions & 30 deletions Packages/StreamVideo/Runtime/Core/StatefulModels/StreamCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ internal sealed class StreamCall : StreamStatefulModelBase<StreamCall>,

//StreamTodo: Maybe add OtherParticipants -> All participants except for the local participant?
public IReadOnlyList<IStreamVideoCallParticipant> Participants => Session?.Participants;

public ParticipantCount ParticipantCount => Session.ParticipantCount;

public bool IsLocalUserOwner
Expand Down Expand Up @@ -453,31 +453,41 @@ public IStreamVideoCallParticipant GetLocalParticipant()
{
tempSb.AppendLine(log);
}

Logs.Error(tempSb.ToString());
}
throw new InvalidOperationException("No participants in the call.");

return null;
}

var localParticipant = Participants.FirstOrDefault(p => p.IsLocalParticipant);
if (localParticipant == null)
{
using (new StringBuilderPoolScope(out var sb))
try
{
var currentSessionId = LowLevelClient.RtcSession.SessionId;
sb.AppendLine($"Local participant not found. Local Session ID: {currentSessionId}. Participants in the call:");
foreach (var p in Participants)
using (new StringBuilderPoolScope(out var sb))
{
sb.AppendLine($" - UserId: {p.UserId}, SessionId: {p.SessionId}, IsLocalParticipant: {p.IsLocalParticipant}");
}
var currentSessionId = LowLevelClient.RtcSession.SessionId;
sb.AppendLine(
$"Local participant not found. Local Session ID: {currentSessionId}. Participants in the call:");
foreach (var p in Participants)
{
sb.AppendLine(
$" - UserId: {p.UserId}, SessionId: {p.SessionId}, IsLocalParticipant: {p.IsLocalParticipant}");
}

sb.AppendLine("Last operations leading to this state:");
foreach (var log in _tempLogs.GetLogs())
{
sb.AppendLine(log);
sb.AppendLine("Last operations leading to this state:");
foreach (var log in _tempLogs.GetLogs())
{
sb.AppendLine(log);
}

Logs.Error(sb.ToString());
}

Logs.Error(sb.ToString());
}
catch (Exception e)
{
Logs.Warning($"Error while generating log for {nameof(GetLocalParticipant)}: " + e.Message);
}
}

Expand All @@ -487,6 +497,8 @@ public IStreamVideoCallParticipant GetLocalParticipant()
void IUpdateableFrom<CallResponseInternalDTO, StreamCall>.UpdateFromDto(CallResponseInternalDTO dto,
ICache cache)
{
var wasBefore = IsLocalParticipantIncluded();

Backstage = dto.Backstage;
_blockedUserIds.TryReplaceValuesFromDto(dto.BlockedUserIds);
Cid = dto.Cid;
Expand All @@ -506,27 +518,33 @@ void IUpdateableFrom<CallResponseInternalDTO, StreamCall>.UpdateFromDto(CallResp
Type = new StreamCallType(dto.Type);
UpdatedAt = dto.UpdatedAt;

var isAfter = IsLocalParticipantIncluded();

try
{
var localParticipantId = LowLevelClient.RtcSession.SessionId;
// Ignore the IDE warning, this can be null
if (dto.Session != null)
{
using (new StringBuilderPoolScope(out var tempSb))
{
tempSb.Append($"`UpdateFromDto(CallResponseInternalDTO dto` - dto participants: {dto.Session.Participants?.Count}, call participants: {Session.Participants.Count}. Dto participants: ");
tempSb.Append(
$"`UpdateFromDto(CallResponseInternalDTO dto` - dto participants: {dto.Session.Participants?.Count}, call participants: {Session.Participants.Count}. ");
tempSb.Append(
$"IsLocalParticipantIncluded ({localParticipantId}) before: {wasBefore}, after: {isAfter}. ");
tempSb.Append("Dto participants:");
foreach (var p in dto.Session.Participants)
{
tempSb.Append($"[UserSessionId: {p.UserSessionId}, SessionId: {p.User?.Id}");
tempSb.Append($"[UserSessionId: {p.UserSessionId}, SessionId: {p.User?.Id}, ");
}

_tempLogs.Add(tempSb.ToString());
}
}

}
catch (Exception e)
{
Logs.Exception(e);
Logs.Warning("Failed to log participants in UpdateFromDto: " + e.Message);
}

// Depends on Session.Participants so load as last
Expand Down Expand Up @@ -595,21 +613,39 @@ internal StreamCall(string uniqueId, ICacheRepository<StreamCall> repository,
//StreamTodo: solve with a generic interface and best to be handled by cache layer
internal void UpdateFromSfu(JoinResponse joinResponse)
{
var wasBefore = IsLocalParticipantIncluded();

((IStateLoadableFrom<CallState, CallSession>)Session).LoadFromDto(joinResponse.CallState, Cache);
UpdateServerPins(joinResponse.CallState.Pins);

var isAfter = IsLocalParticipantIncluded();

try
{
var localParticipantId = LowLevelClient.RtcSession.SessionId;
using (new StringBuilderPoolScope(out var tempSb))
{
tempSb.Append("`UpdateFromSfu(JoinResponse joinResponse)` - ");
tempSb.Append(
$"IsLocalParticipantIncluded ({localParticipantId}) before: {wasBefore}, after: {isAfter}. ");
tempSb.Append("`UpdateFromSfu(JoinResponse joinResponse)` - joinResponse participants: ");
if(joinResponse.CallState !=null && joinResponse.CallState.Participants != null)
if (joinResponse.CallState != null && joinResponse.CallState.Participants != null)
{
foreach (var p in joinResponse.CallState.Participants)
{
tempSb.Append($"[UserId: {p.UserId}, SessionId: {p.SessionId}, ");
}
}
else
{
tempSb.Append("joinResponse.CallState not null:");
tempSb.Append(joinResponse.CallState != null);
tempSb.Append("joinResponse.CallState.Participants not null: ");
tempSb.Append(joinResponse.CallState?.Participants != null);
tempSb.Append("count: ");
tempSb.Append(joinResponse.CallState?.Participants?.Count);
}

_tempLogs.Add(tempSb.ToString());
}
}
Expand All @@ -628,6 +664,20 @@ internal void UpdateFromSfu(ParticipantJoined participantJoined, ICache cache)

internal void UpdateFromSfu(ParticipantLeft participantLeft, ICache cache)
{
try
{
var p = cache.TryCreateOrUpdate(participantLeft.Participant);
if (p.IsLocalParticipant)
{
_tempLogs.Add(
"`UpdateFromSfu(ParticipantLeft participantLeft)` - ERROR - local participant is leaving the call.");
}
}
catch (Exception e)
{
Logs.Warning("Error when generating debug log: " + e.Message);
}

var participant = Session.UpdateFromSfu(participantLeft, cache);

_localPinsSessionIds.RemoveAll(participant.sessionId);
Expand Down Expand Up @@ -666,23 +716,23 @@ internal void UpdateFromSfu(HealthCheckResponse healthCheckResponse, ICache cach
{
Session?.UpdateFromSfu(healthCheckResponse, cache);
}

internal void UpdateFromCoordinator(CallSessionParticipantCountsUpdatedEventInternalDTO eventData)
{
Session?.UpdateFromCoordinator(eventData, Client.InternalLowLevelClient.RtcSession.CallState);
}

internal void UpdateFromCoordinator(CallSessionParticipantJoinedEventInternalDTO eventData, ICache cache)
{
Session?.UpdateFromCoordinator(eventData, cache, Client.InternalLowLevelClient.RtcSession.CallState);

//StreamTodo: we should extract AddParticipant logic from SFU and whatever is received first (SFU or Coordinator) should handle it
}

internal void UpdateFromCoordinator(CallSessionParticipantLeftEventInternalDTO eventData, ICache cache)
{
Session?.UpdateFromCoordinator(eventData, cache, Client.InternalLowLevelClient.RtcSession.CallState);

//StreamTodo: we should extract RemoveParticipant logic from SFU and whatever is received first (SFU or Coordinator) should handle it
}

Expand Down Expand Up @@ -759,12 +809,14 @@ internal void InternalHandleCallRecordingStartedEvent(CallReactionEventInternalD

//StreamTodo: NullReferenceException here because _client is never set
var participant
= Client.InternalLowLevelClient.RtcSession.ActiveCall.Participants.FirstOrDefault(p => p.UserId == reaction.User.Id);
= Client.InternalLowLevelClient.RtcSession.ActiveCall.Participants.FirstOrDefault(p
=> p.UserId == reaction.User.Id);
if (participant == null)
{
Logs.ErrorIfDebug(
$"Failed to find participant for reaction. UserId: {reaction.User.Id}, Participants: " +
string.Join(", ", Client.InternalLowLevelClient.RtcSession.ActiveCall.Participants.Select(p => p.UserId)));
string.Join(", ",
Client.InternalLowLevelClient.RtcSession.ActiveCall.Participants.Select(p => p.UserId)));
return;
}

Expand Down Expand Up @@ -928,7 +980,7 @@ private void UpdateCapabilitiesByRole(Dictionary<string, List<string>> capabilit
tempRolesToRemove.Add(role);
}
}

foreach (var role in tempRolesToRemove)
{
_capabilitiesByRole.Remove(role);
Expand Down Expand Up @@ -987,5 +1039,15 @@ private void GetOrCreateParticipantsCustomDataSection(IStreamVideoCallParticipan

participantCustomData = allParticipantsCustomData[participant.SessionId];
}

private bool IsLocalParticipantIncluded()
{
if (Session == null || Session.Participants == null || Session.Participants.Count == 0)
{
return false;
}

return Session.Participants.FirstOrDefault(p => p.IsLocalParticipant) != null;
}
}
}
2 changes: 1 addition & 1 deletion Packages/StreamVideo/Runtime/Core/Utils/DebugLogBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace StreamVideo.Core.Utils
{
internal class DebugLogBuffer
{
private const int MaxSize = 10;
private const int MaxSize = 15;
private readonly string[] _buffer = new string[MaxSize];
private int _index;
private int _count;
Expand Down
Loading