Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions Gofer.NET.Tests/GivenATaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using FluentAssertions;
using Gofer.NET.Utils;

using Xunit;

namespace Gofer.NET.Tests
Expand All @@ -15,11 +16,11 @@ public class GivenATaskClient
public async Task ItContinuesListeningWhenATaskThrowsAnException()
{
var waitTime = 5000;

var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
var taskClient = new TaskClient(taskQueue);
var semaphoreFile = Path.GetTempFileName();

await taskClient.TaskQueue.Enqueue(() => Throw());
await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WriteSemaphore(semaphoreFile));

Expand All @@ -28,37 +29,59 @@ public async Task ItContinuesListeningWhenATaskThrowsAnException()

taskClient.CancelListen();
await task;


TaskQueueTestFixture.EnsureSemaphore(semaphoreFile);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a bug in the test, good catch!

}

[Fact]
public async Task ItStopsOnCancellation()
{
var semaphoreFile = Path.GetTempFileName();

var waitTime = 2000;

var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
var taskClient = new TaskClient(taskQueue);
var cancellation = new CancellationTokenSource();

await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WaitForTaskClientCancellationAndWriteSemaphore(semaphoreFile));

var task = Task.Run(async () => await taskClient.Listen(cancellation.Token), CancellationToken.None);
await Task.Delay(waitTime, CancellationToken.None);
cancellation.Cancel();
Copy link
Copy Markdown
Author

@ig-sinicyn ig-sinicyn Jul 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enqueued task (WaitForTaskClientCancellationAndWriteSemaphore) will be canceled at this moment.

await Task.Delay(waitTime, CancellationToken.None);
await task;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a bug is introduced, and the task fails to exit on cancellation will this introduce a hanging test?


TaskQueueTestFixture.EnsureSemaphore(semaphoreFile);
}

[Fact]
public async Task ItDoesNotDelayScheduledTaskPromotionWhenRunningLongTasks()
{
var waitTime = 4000;

var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
var taskClient = new TaskClient(taskQueue);

var semaphoreFile = Path.GetTempFileName();
File.Delete(semaphoreFile);
File.Exists(semaphoreFile).Should().BeFalse();

await taskClient.TaskQueue.Enqueue(() => Wait(waitTime));

await taskClient.TaskScheduler.AddScheduledTask(
() => TaskQueueTestFixture.WriteSemaphore(semaphoreFile),
TimeSpan.FromMilliseconds(waitTime / 4));

var task = Task.Run(async () => await taskClient.Listen());

await Task.Delay(waitTime / 2);

// Ensure we did not run the scheduled task
File.Exists(semaphoreFile).Should().BeFalse();

var dequeuedScheduledTask = await taskQueue.Dequeue();

File.Exists(semaphoreFile).Should().BeFalse();
dequeuedScheduledTask.Should().NotBeNull();
dequeuedScheduledTask.MethodName.Should().Be(nameof(TaskQueueTestFixture.WriteSemaphore));
Expand All @@ -83,19 +106,19 @@ public async Task ItExecutesImmediateAndScheduledTasksInOrder()
File.Delete(semaphoreFile);
File.Exists(semaphoreFile).Should().BeFalse();

for (var i=0; i<immediateTasks; ++i)
for (var i = 0; i < immediateTasks; ++i)
{
await taskClient.TaskQueue.Enqueue(() =>
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i+1).ToString()));
await taskClient.TaskQueue.Enqueue(() =>
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i + 1).ToString()));
}

for (var i=0; i<scheduledTasks; ++i)
for (var i = 0; i < scheduledTasks; ++i)
{
await taskClient.TaskScheduler.AddScheduledTask(
() => TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks+i+1).ToString()),
TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement*i)));
() => TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks + i + 1).ToString()),
TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement * i)));
}

var task = Task.Run(async () => await taskClient.Listen());
Thread.Sleep(scheduledTasks * scheduledTasksIncrement + 2000);

Expand Down
40 changes: 28 additions & 12 deletions Gofer.NET.Tests/TestQueueTestFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,38 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;

using FluentAssertions;

namespace Gofer.NET.Tests
{
public class TaskQueueTestFixture
{
private static readonly ReaderWriterLock Locker = new ReaderWriterLock();

public static string SemaphoreText => "completed";

public TaskQueue TaskQueue { get; }

public static string RedisConnectionString => "localhost:6379";

private readonly string _semaphoreFile;

public static TaskQueue UniqueRedisTaskQueue(string prefix=null)
public static TaskQueue UniqueRedisTaskQueue(string prefix = null)
{
var taskQueueName = $"{prefix ?? nameof(TaskQueueTestFixture)}::{Guid.NewGuid().ToString()}";
return TaskQueue.Redis(RedisConnectionString, taskQueueName);
}

public TaskQueueTestFixture(string uniqueId, TaskQueue taskQueue=null)
public TaskQueueTestFixture(string uniqueId, TaskQueue taskQueue = null)
{
_semaphoreFile = Path.Combine(AppContext.BaseDirectory, uniqueId, Path.GetTempFileName());

var testQueueName = uniqueId + "::TestQueue";
TaskQueue = taskQueue ?? TaskQueueTestFixture.UniqueRedisTaskQueue(uniqueId);

// Clear out the queue
while(TaskQueue.Dequeue().Result != null) { }
while (TaskQueue.Dequeue().Result != null) { }
}

public async Task PushPopExecuteWriteSemaphore()
Expand All @@ -52,12 +53,12 @@ public void EnsureSemaphore()
{
EnsureSemaphore(_semaphoreFile);
}

public static void EnsureSemaphore(string semaphoreFile)
{
try
{
Locker.AcquireReaderLock(30000);
Locker.AcquireReaderLock(30000);
File.ReadAllText(semaphoreFile).Should().Be(SemaphoreText);
}
finally
Expand All @@ -70,12 +71,27 @@ public static void WriteSemaphore(string semaphoreFile)
{
WriteSemaphoreValue(semaphoreFile, SemaphoreText);
}


public static async Task WaitForTaskClientCancellationAndWriteSemaphore(string semaphoreFile)
{
var token = TaskClient.GetListenCancellation();
if (!token.CanBeCanceled)
throw new InvalidOperationException("This method must be called from a task client callback");
try
{
await Task.Delay(-1, token);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the real potential for a test hang is here. Perhaps a polling loop with a timeout?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Fixed with a timeout parameter.

}
catch (OperationCanceledException)
{
}
WriteSemaphore(semaphoreFile);
}

public static void WriteSemaphoreValue(string semaphoreFile, object value)
{
try
{
Locker.AcquireWriterLock(30000);
Locker.AcquireWriterLock(30000);
File.AppendAllText(semaphoreFile, value?.ToString() ?? "null");
}
finally
Expand Down
72 changes: 45 additions & 27 deletions TaskClient.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
using System;
using System.ComponentModel;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Gofer.NET.Errors;

using Gofer.NET.Utils;
using Newtonsoft.Json;

namespace Gofer.NET
{
public class TaskClient
{
private static readonly AsyncLocal<CancellationToken> _listenCancellationContext = new AsyncLocal<CancellationToken>();

private static readonly object Locker = new object();

private const int PollDelay = 100;

private bool IsCanceled { get; set; }
public static CancellationToken GetListenCancellation() => _listenCancellationContext.Value;

public TaskQueue TaskQueue { get; }

public Action<Exception> OnError { get; }
Expand All @@ -29,35 +30,46 @@ public class TaskClient
private CancellationTokenSource ListenCancellationTokenSource { get; set; }

public TaskClient(
TaskQueue taskQueue,
Action<Exception> onError=null)
TaskQueue taskQueue,
Action<Exception> onError = null)
{
TaskQueue = taskQueue;
OnError = onError;
TaskScheduler = new TaskScheduler(TaskQueue);
IsCanceled = false;
}

public async Task Listen()
public Task Listen()
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clean up the mix of being able to pass in a cancellation token here and at the same time have a default cancellation token source I recommend the following:

  1. Change the Start(CancellationToken cancellation) overload to Start(CancellationTokenSource cancellationTokenSource)

  2. Inside of the Listen(CancellationToken cancellation) use CancellationTokenSource.CreateLinkedTokenSource(cancellation) to create a CancellationTokenSource to pass to Start()

  3. Inside the Listen() method overload, do not call the other overload of Listen but instead duplicate the code and call the Start() overload with no parameters.

  4. Inside the Start() overload with no parameters use new CancellationTokenSource() and use it to call the other overload of Start Start(CancellationToken cancellation)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current implementation has a very low overhead so I'd prefer to keep it until there are some scenarios it does not fit nicely.

Official docs recommend to pass CancellationToken, not CancellationTokenSource. Linked cancellation token is a standard approach for supporting both external and internal cancellation. So there is nothing unusual and surprising for end-user code.

{
return Listen(CancellationToken.None);
}

public async Task Listen(CancellationToken cancellation)
{
Start();
Start(cancellation);

await Task.WhenAll(new [] {
TaskRunnerThread,
await Task.WhenAll(new[] {
TaskRunnerThread,
TaskSchedulerThread});
}

public CancellationTokenSource Start()
{
return Start(CancellationToken.None);
}

public CancellationTokenSource Start(CancellationToken cancellation)
{
if (TaskSchedulerThread != null || TaskRunnerThread != null)
{
throw new Exception("This TaskClient is already listening.");
}

ListenCancellationTokenSource = new CancellationTokenSource();

ListenCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation);
var token = ListenCancellationTokenSource.Token;

TaskSchedulerThread = Task.Run(async () => {
TaskSchedulerThread = Task.Run(async () =>
{
var inThreadTaskScheduler = new TaskScheduler(TaskQueue);

while (true)
Expand All @@ -69,43 +81,49 @@ public CancellationTokenSource Start()

await inThreadTaskScheduler.Tick();
}
}, ListenCancellationTokenSource.Token);
}, token);

TaskRunnerThread = Task.Run(async () => {
TaskRunnerThread = Task.Run(async () =>
{
while (true)
{
if (token.IsCancellationRequested)
{
return;
}
await ExecuteQueuedTask();

await ExecuteQueuedTask(token);
}
}, ListenCancellationTokenSource.Token);
}, token);

return ListenCancellationTokenSource;
}

private async Task ExecuteQueuedTask()
private async Task ExecuteQueuedTask(CancellationToken token)
{
var (json, info) = await TaskQueue.SafeDequeue();
if (info != null)
{
LogTaskStarted(info);

var old = _listenCancellationContext.Value;
try
{
var now = DateTime.Now;

_listenCancellationContext.Value = token;

var executionTimer = Stopwatch.StartNew();
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

await info.ExecuteTask();

var completionSeconds = (DateTime.Now - now).TotalSeconds;
executionTimer.Stop();
var completionSeconds = executionTimer.Elapsed.TotalSeconds;
LogTaskFinished(info, completionSeconds);
}
catch (Exception e)
{
LogTaskException(info, e);
}
finally
{
_listenCancellationContext.Value = old;
}
}
}

Expand All @@ -122,7 +140,7 @@ private void LogTaskStarted(TaskInfo info)
var logMessage = Messages.TaskStarted(info);
ThreadSafeColoredConsole.Info(logMessage);
}

private void LogTaskFinished(TaskInfo info, double completionSeconds)
{
var logMessage = Messages.TaskFinished(info, completionSeconds);
Expand Down
8 changes: 7 additions & 1 deletion TaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ public async Task Enqueue(Expression<Action> expression)
var taskInfo = expression.ToTaskInfo();
await Enqueue(taskInfo);
}


public async Task Enqueue<T>(Expression<Func<T>> expression)
{
var taskInfo = expression.ToTaskInfo();
await Enqueue(taskInfo);
}

internal async Task Enqueue(TaskInfo taskInfo)
{
taskInfo.ConvertTypeArgs();
Expand Down