Skip to content

Commit

Permalink
Merge pull request #21 from ooonush/dev
Browse files Browse the repository at this point in the history
Version 1.1.0
  • Loading branch information
ooonush authored Jan 26, 2024
2 parents 2575fe0 + aa8b1a7 commit f973aef
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,51 +221,77 @@ public bool PushMessage(ArraySegment<byte> message)
/// could lead to a corrupted queue.
/// </remarks>
/// <param name="writer">The <see cref="DataStreamWriter"/> to write to.</param>
/// <param name="softMaxBytes">
/// Maximum number of bytes to copy (0 means writer capacity). This is a soft limit only.
/// If a message is larger than that but fits in the writer, it will be written. In effect,
/// this parameter is the maximum size that small messages can be coalesced together.
/// </param>
/// <returns>How many bytes were written to the writer.</returns>
public int FillWriterWithMessages(ref DataStreamWriter writer)
public int FillWriterWithMessages(ref DataStreamWriter writer, int softMaxBytes = 0)
{
if (!IsCreated || Length == 0)
{
return 0;
}

softMaxBytes = softMaxBytes == 0 ? writer.Capacity : Math.Min(softMaxBytes, writer.Capacity);

unsafe
{
var reader = new DataStreamReader(m_Data.AsArray());

var writerAvailable = writer.Capacity;
var readerOffset = HeadIndex;

while (readerOffset < TailIndex)
reader.SeekSet(readerOffset);
var messageLength = reader.ReadInt();
var bytesToWrite = messageLength + sizeof(int);

// Our behavior here depends on the size of the first message in the queue. If it's
// larger than the soft limit, then add only that message to the writer (we want
// large payloads to be fragmented on their own). Otherwise coalesce all small
// messages until we hit the soft limit (which presumably means they won't be
// fragmented, which is the desired behavior for smaller messages).

if (bytesToWrite > softMaxBytes && bytesToWrite <= writer.Capacity)
{
writer.WriteInt(messageLength);
WriteBytes(ref writer, (byte*)m_Data.GetUnsafePtr() + reader.GetBytesRead(), messageLength);

return bytesToWrite;
}
else
{
reader.SeekSet(readerOffset);
var messageLength = reader.ReadInt();
var bytesWritten = 0;

if (writerAvailable < sizeof(int) + messageLength)
while (readerOffset < TailIndex)
{
break;
reader.SeekSet(readerOffset);
messageLength = reader.ReadInt();
bytesToWrite = messageLength + sizeof(int);

if (bytesWritten + bytesToWrite <= softMaxBytes)
{
writer.WriteInt(messageLength);
WriteBytes(ref writer, (byte*)m_Data.GetUnsafePtr() + reader.GetBytesRead(), messageLength);

readerOffset += bytesToWrite;
bytesWritten += bytesToWrite;
}
else
{
break;
}
}
else
{
writer.WriteInt(messageLength);

var messageOffset = reader.GetBytesRead();
WriteBytes(ref writer, (byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength);

writerAvailable -= sizeof(int) + messageLength;
readerOffset += sizeof(int) + messageLength;
}
return bytesWritten;
}

return writer.Capacity - writerAvailable;
}
}

/// <summary>
/// Fill the given <see cref="DataStreamWriter"/> with as many bytes from the queue as
/// possible, disregarding message boundaries.
/// </summary>
///<remarks>
/// <remarks>
/// This does NOT actually consume anything from the queue. That is, calling this method
/// does not reduce the length of the queue. Callers are expected to call
/// <see cref="Consume"/> with the value returned by this method afterwards if the data can
Expand All @@ -275,15 +301,17 @@ public int FillWriterWithMessages(ref DataStreamWriter writer)
/// this could lead to reading messages from a corrupted queue.
/// </remarks>
/// <param name="writer">The <see cref="DataStreamWriter"/> to write to.</param>
/// <param name="maxBytes">Max number of bytes to copy (0 means writer capacity).</param>
/// <returns>How many bytes were written to the writer.</returns>
public int FillWriterWithBytes(ref DataStreamWriter writer)
public int FillWriterWithBytes(ref DataStreamWriter writer, int maxBytes = 0)
{
if (!IsCreated || Length == 0)
{
return 0;
}

var copyLength = Math.Min(writer.Capacity, Length);
var maxLength = maxBytes == 0 ? writer.Capacity : Math.Min(maxBytes, writer.Capacity);
var copyLength = Math.Min(maxLength, Length);

unsafe
{
Expand Down
86 changes: 62 additions & 24 deletions Assets/FishNet/Plugins/FishyUnityTransport/FishyUnityTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ public INetworkStreamDriverConstructor DriverConstructor
[SerializeField]
private ProtocolType m_ProtocolType;

#if UTP_TRANSPORT_2_0_ABOVE
[Tooltip("Per default the client/server will communicate over UDP. Set to true to communicate with WebSocket.")]
[SerializeField]
#if !UTP_TRANSPORT_2_0_ABOVE
[HideInInspector]
#endif
private bool m_UseWebSockets = false;

public bool UseWebSockets
Expand All @@ -158,13 +160,15 @@ public bool UseWebSockets
/// </summary>
[Tooltip("Per default the client/server communication will not be encrypted. Select true to enable DTLS for UDP and TLS for Websocket.")]
[SerializeField]
#if !UTP_TRANSPORT_2_0_ABOVE
[HideInInspector]
#endif
private bool m_UseEncryption = false;
public bool UseEncryption
{
get => m_UseEncryption;
set => m_UseEncryption = value;
}
#endif

[Tooltip("The maximum amount of packets that can be in the internal send/receive queues. Basically this is how many packets can be sent/received in a single update/frame.")]
[SerializeField]
Expand Down Expand Up @@ -748,15 +752,7 @@ private bool AcceptConnection()
return false;
}

if (NetworkManager.ServerManager.Clients.Count >= GetMaximumClients())
{
DisconnectRemoteClient(ParseClientId(connection));
}
else
{
HandleRemoteConnectionState(RemoteConnectionState.Started, ParseClientId(connection));
}

HandleRemoteConnectionState(RemoteConnectionState.Started, ParseClientId(connection));
return true;
}

Expand Down Expand Up @@ -868,10 +864,7 @@ private void IterateIncoming()
if (m_ProtocolType == ProtocolType.RelayUnityTransport && m_Driver.GetRelayConnectionStatus() == RelayConnectionStatus.AllocationInvalid)
{
Debug.LogError("Transport failure! Relay allocation needs to be recreated, and NetworkManager restarted. ");
// + "Use NetworkManager.OnTransportFailure to be notified of such events programmatically.");

// TODO
// InvokeOnTransportEvent(TransportFailure);
Shutdown();
return;
}

Expand Down Expand Up @@ -971,6 +964,7 @@ private void DisconnectLocalClient()
SetClientConnectionState(LocalConnectionState.Stopped);
}
}
Disconnect();
}

/// <summary>
Expand Down Expand Up @@ -1023,7 +1017,13 @@ private void InitializeNetworkSettings()
var fragmentationCapacity = m_MaxPayloadSize + BatchedSendQueue.PerMessageOverhead;
m_NetworkSettings.WithFragmentationStageParameters(payloadCapacity: fragmentationCapacity);

m_NetworkSettings.WithReliableStageParameters(windowSize: 64);
m_NetworkSettings.WithReliableStageParameters(
windowSize: 64
#if UTP_TRANSPORT_2_0_ABOVE
,
maximumResendTime: m_ProtocolType == ProtocolType.RelayUnityTransport ? 750 : 500
#endif
);

#if !UTP_TRANSPORT_2_0_ABOVE && !UNITY_WEBGL
m_NetworkSettings.WithBaselibNetworkInterfaceParameters(
Expand Down Expand Up @@ -1089,8 +1089,6 @@ private void Send(ulong clientId, ArraySegment<byte> payload, Channel channel)
else
{
DisconnectRemoteClient(clientId);

HandleRemoteConnectionState(RemoteConnectionState.Stopped, clientId);
}
}
else
Expand Down Expand Up @@ -1368,6 +1366,21 @@ public void CreateDriver(FishyUnityTransport transport, out NetworkDriver driver
}
#endif

#if UTP_TRANSPORT_2_1_ABOVE
if (m_ProtocolType == ProtocolType.RelayUnityTransport)
{
if (m_UseWebSockets && m_RelayServerData.IsWebSocket == 0)
{
Debug.LogError("Transport is configured to use WebSockets, but Relay server data isn't. Be sure to use \"wss\" as the connection type when creating the server data (instead of \"dtls\" or \"udp\").");
}

if (!m_UseWebSockets && m_RelayServerData.IsWebSocket != 0)
{
Debug.LogError("Relay server data indicates usage of WebSockets, but \"Use WebSockets\" checkbox isn't checked under \"Unity Transport\" component.");
}
}
#endif

#if UTP_TRANSPORT_2_0_ABOVE
if (m_UseWebSockets)
{
Expand Down Expand Up @@ -1612,6 +1625,8 @@ public override void HandleServerReceivedDataArgs(ServerReceivedDataArgs receive

public override void SendToServer(byte channelId, ArraySegment<byte> segment)
{
if (m_ClientState != LocalConnectionState.Started) return;

if (m_ServerState == LocalConnectionState.Started)
{
ClientHostSendToServer(channelId, segment);
Expand All @@ -1624,6 +1639,8 @@ public override void SendToServer(byte channelId, ArraySegment<byte> segment)

public override void SendToClient(byte channelId, ArraySegment<byte> segment, int connectionId)
{
if (m_ServerState != LocalConnectionState.Started) return;

ulong transportId = ClientIdToTransportId(connectionId);
if (m_ClientState == LocalConnectionState.Started && transportId == m_ServerClientId)
{
Expand Down Expand Up @@ -1730,10 +1747,25 @@ private void HandleRemoteConnectionState(RemoteConnectionState state, ulong clie
switch (state)
{
case RemoteConnectionState.Started:
transportId = m_NextClientId++;
m_TransportIdToClientIdMap[transportId] = clientId;
m_ClientIdToTransportIdMap[clientId] = transportId;
HandleRemoteConnectionState(new RemoteConnectionStateArgs(state, transportId, Index));
if (m_TransportIdToClientIdMap.Count >= GetMaximumClients())
{
Debug.LogWarning("Connection limit reached. Server cannot accept new connections.");
// The server can disconnect the client at any time, even during ProcessEvent(), which can cause errors because the client still has Network Events.
// This workaround simply clears the Event queue for the client.
NetworkConnection connection = ParseClientId(clientId);
if (m_Driver.GetConnectionState(connection) != NetworkConnection.State.Disconnected)
{
m_Driver.Disconnect(connection);
while (m_Driver.PopEventForConnection(connection, out var _) != NetworkEvent.Type.Empty) { }
}
}
else
{
transportId = m_NextClientId++;
m_TransportIdToClientIdMap[transportId] = clientId;
m_ClientIdToTransportIdMap[clientId] = transportId;
HandleRemoteConnectionState(new RemoteConnectionStateArgs(state, transportId, Index));
}
break;
case RemoteConnectionState.Stopped:
transportId = m_ClientIdToTransportIdMap[clientId];
Expand Down Expand Up @@ -1806,7 +1838,6 @@ private bool StopClient()
}

DisconnectLocalClient();
Disconnect();

return true;
}
Expand Down Expand Up @@ -1842,9 +1873,16 @@ private bool StartClientHost()
if (m_ServerState == LocalConnectionState.Started || m_ServerState == LocalConnectionState.Starting)
{
SetClientConnectionState(LocalConnectionState.Starting);
if (m_TransportIdToClientIdMap.Count >= GetMaximumClients())
{
SetClientConnectionState(LocalConnectionState.Stopping);
Debug.LogWarning("Connection limit reached. Server cannot accept new connections.");
SetClientConnectionState(LocalConnectionState.Stopped);
return false;
}
m_ServerClientId = k_ClientHostId;
SetClientConnectionState(LocalConnectionState.Started);
HandleRemoteConnectionState(RemoteConnectionState.Started, m_ServerClientId);
SetClientConnectionState(LocalConnectionState.Started);
return true;
}
return false;
Expand Down
Loading

0 comments on commit f973aef

Please sign in to comment.