Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add connectionToken property from IConnection #714

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/SuperSocket.Connection/ConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
namespace SuperSocket.Connection
{
public abstract class ConnectionBase : IConnection
{
{
public abstract IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter);

public abstract ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);

public abstract ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken = default);

public abstract ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken = default);

public bool IsClosed { get; private set; }
Expand All @@ -28,6 +28,8 @@ public abstract class ConnectionBase : IConnection

public DateTimeOffset LastActiveTime { get; protected set; } = DateTimeOffset.Now;

public CancellationToken ConnectionToken { get; protected set; }

protected virtual void OnClosed()
{
IsClosed = true;
Expand Down
2 changes: 2 additions & 0 deletions src/SuperSocket.Connection/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public interface IConnection

EndPoint LocalEndPoint { get; }

CancellationToken ConnectionToken { get; }

DateTimeOffset LastActiveTime { get; }

ValueTask DetachAsync();
Expand Down
21 changes: 11 additions & 10 deletions src/SuperSocket.Connection/PipeConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ PipeReader IPipeConnection.InputReader
{
get { return InputReader; }
}

IPipelineFilter IPipeConnection.PipelineFilter
{
get { return _pipelineFilter; }
Expand All @@ -52,6 +52,7 @@ protected PipeConnectionBase(PipeReader inputReader, PipeWriter outputWriter, Co
Logger = options.Logger;
InputReader = inputReader;
OutputWriter = outputWriter;
ConnectionToken = _cts.Token;
}

protected virtual Task StartTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe)
Copy link
Owner

@kerryjiang kerryjiang Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_cts.Token can be replaced with ConnectionToken in this file. And then we will see how to improve _cts and we should let it work together with KestrelPipeConnection.

Expand All @@ -72,7 +73,7 @@ public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPip

_packagePipe = packagePipe;
_pipelineFilter = pipelineFilter;

_pipeTask = StartTask(packagePipe);

_ = HandleClosing();
Expand Down Expand Up @@ -118,7 +119,7 @@ private async ValueTask HandleClosing()
{
if (!IsIgnorableException(exc))
OnError("Unhandled exception in the method PipeChannel.Close.", exc);
}
}
}
}
}
Expand Down Expand Up @@ -172,7 +173,7 @@ public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, Cancellat
finally
{
SendLock.Release();
}
}
}

private void WriteBuffer(PipeWriter writer, ReadOnlyMemory<byte> buffer)
Expand All @@ -195,7 +196,7 @@ public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> pa
}
}

public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken = default)
{
try
{
Expand Down Expand Up @@ -236,7 +237,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
{
if (!IsIgnorableException(e) && !(e is OperationCanceledException))
OnError("Failed to read from the pipe", e);

break;
}

Expand Down Expand Up @@ -267,7 +268,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
{
completed = true;
break;
}
}
}

if (completed)
Expand Down Expand Up @@ -344,7 +345,7 @@ private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipe
Close();
return false;
}

if (packageInfo == null)
{
// the current pipeline filter needs more data to process
Expand All @@ -370,12 +371,12 @@ private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipe
examined = consumed = buffer.End;
return true;
}

if (bytesConsumed > 0)
seqReader = new SequenceReader<byte>(seqReader.Sequence.Slice(bytesConsumed));
}
}

public override async ValueTask DetachAsync()
{
_isDetaching = true;
Expand Down
12 changes: 7 additions & 5 deletions src/SuperSocket.Kestrel/KestrelPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ protected override void OnClosed()
{
if (!CloseReason.HasValue)
CloseReason = Connection.CloseReason.RemoteClosing;


Cancel();
base.OnClosed();
}

Expand All @@ -55,21 +56,22 @@ protected override void OnInputPipeRead(ReadResult result)
}
}

public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken = default)
{
await base.SendAsync(write, cancellationToken);
UpdateLastActiveTime();
}

public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
await base.SendAsync(buffer, cancellationToken);
UpdateLastActiveTime();
}

public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken)
public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package,
CancellationToken cancellationToken = default)
{
await base.SendAsync(packageEncoder, package, cancellationToken);
UpdateLastActiveTime();
}
}
}
42 changes: 19 additions & 23 deletions src/SuperSocket.Server/SuperSocketService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,11 @@ protected virtual ValueTask OnSessionClosedAsync(IAppSession session, CloseEvent
if (closedHandler != null)
return closedHandler.Invoke(session, e);

#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
}

protected virtual async ValueTask FireSessionConnectedEvent(AppSession session)
Expand Down Expand Up @@ -350,7 +350,7 @@ protected virtual async ValueTask FireSessionClosedEvent(AppSession session, Clo
if (!handshakeSession.Handshaked)
return;
}

await UnRegisterSessionFromMiddlewares(session);

_logger.LogInformation($"The session disconnected: {session.SessionID} ({reason})");
Expand Down Expand Up @@ -396,18 +396,18 @@ private async ValueTask HandleSession(AppSession session, IConnection connection
var packageHandlingScheduler = _packageHandlingScheduler;

#if NET6_0_OR_GREATER
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(CancellationToken.None);
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(connection.ConnectionToken);
#endif

await foreach (var p in packageStream)
{
if(_packageHandlingContextAccessor != null)
if (_packageHandlingContextAccessor != null)
{
_packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);
}

#if !NET6_0_OR_GREATER
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(CancellationToken.None);
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(connection.ConnectionToken);
#endif
await packageHandlingScheduler.HandlePackage(session, p, cancellationTokenSource.Token);

Expand All @@ -424,13 +424,9 @@ private async ValueTask HandleSession(AppSession session, IConnection connection

protected virtual CancellationTokenSource GetPackageHandlingCancellationTokenSource(CancellationToken cancellationToken)
{
#if NET6_0_OR_GREATER
return CancellationTokenSourcePool.Shared.Rent(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut));
#else
var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut));
return cancellationTokenSource;
#endif
}

protected virtual ValueTask<bool> OnSessionErrorAsync(IAppSession session, PackageHandlingException<TReceivePackageInfo> exception)
Expand Down Expand Up @@ -471,20 +467,20 @@ public async Task StartAsync(CancellationToken cancellationToken)

protected virtual ValueTask OnStartedAsync()
{
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
}

protected virtual ValueTask OnStopAsync()
{
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
}

private async Task StopListener(IConnectionListener listener)
Expand Down