Skip to content

Commit 3381bc2

Browse files
authored
Improve docs, update deps, fix key sort logic (#401)
* Improve docs, update deps, fix key sort logic Expanded README with detailed usage, API, and troubleshooting sections. Updated NuGet dependencies in test and main projects. Fixed KeyedOperation sorting to prioritize unkeyed operations correctly. Improved cancellation token handling in OperationQueueExtensions. Minor solution and XML doc updates. * Remove Splat as unused * Update OperationQueueExtensions.cs
1 parent 8c15637 commit 3381bc2

File tree

7 files changed

+173
-57
lines changed

7 files changed

+173
-57
lines changed

README.md

Lines changed: 147 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,51 +24,163 @@ Then, you try to manage issuing less requests by hand, and it becomes a
2424
spaghetti mess as different parts of your app reach into each other to try to
2525
figure out who's doing what. Let's figure out a better way.
2626

27-
### So many words, gimme the examples
28-
29-
```cs
30-
var wc = new WebClient();
31-
var opQueue = new OperationQueue(2 /*at a time*/);
32-
33-
// Download a bunch of images
34-
var foo = opQueue.Enqueue(1,
35-
() => wc.DownloadFile("https://example.com/foo.jpg", "foo.jpg"));
36-
var bar = opQueue.Enqueue(1,
37-
() => wc.DownloadFile("https://example.com/bar.jpg", "bar.jpg"));
38-
var baz = opQueue.Enqueue(1,
39-
() => wc.DownloadFile("https://example.com/baz.jpg", "baz.jpg"));
40-
var bamf = opQueue.Enqueue(1,
41-
() => wc.DownloadFile("https://example.com/bamf.jpg", "bamf.jpg"));
42-
43-
// We'll be downloading the images two at a time, even though we started
44-
// them all at once
45-
await Task.WaitAll(foo, bar, baz, bamf);
27+
## Key features
28+
29+
- Bounded concurrency with a priority-aware semaphore
30+
- Priority scheduling (higher number runs first)
31+
- Key-based serialization (only one operation per key runs at a time)
32+
- Pause/resume with reference counting
33+
- Cancellation via CancellationToken or IObservable
34+
- Task and IObservable friendly API
35+
36+
## Install
37+
38+
- NuGet: `dotnet add package Punchclock`
39+
40+
## Quick start
41+
42+
```csharp
43+
using Punchclock;
44+
using System.Net.Http;
45+
46+
var queue = new OperationQueue(maximumConcurrent: 2);
47+
var http = new HttpClient();
48+
49+
// Fire a bunch of downloads – only two will run at a time
50+
var t1 = queue.Enqueue(1, () => http.GetStringAsync("https://example.com/a"));
51+
var t2 = queue.Enqueue(1, () => http.GetStringAsync("https://example.com/b"));
52+
var t3 = queue.Enqueue(1, () => http.GetStringAsync("https://example.com/c"));
53+
await Task.WhenAll(t1, t2, t3);
54+
```
55+
56+
## Priorities
57+
58+
- Higher numbers win. A priority 10 operation will preempt priority 1 when a slot opens.
59+
60+
```csharp
61+
await queue.Enqueue(10, () => http.GetStringAsync("https://example.com/urgent"));
62+
```
63+
64+
## Keys: serialize related work
65+
66+
- Use a key to ensure only one operation for that key runs at a time.
67+
- Useful to avoid thundering herds against the same resource.
68+
69+
```csharp
70+
// These will run one-after-another because they share the same key
71+
var k1 = queue.Enqueue(1, key: "user:42", () => LoadUserAsync(42));
72+
var k2 = queue.Enqueue(1, key: "user:42", () => LoadUserPostsAsync(42));
73+
await Task.WhenAll(k1, k2);
4674
```
4775

48-
Now, in a completely different part of your app, if you need something right
49-
away, you can specify it via the priority:
76+
## Cancellation
77+
78+
- Via CancellationToken:
79+
80+
```csharp
81+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
82+
await queue.Enqueue(1, key: "img:1", () => DownloadImageAsync("/1"), cts.Token);
83+
```
84+
85+
- Via IObservable cancellation signal:
86+
87+
```csharp
88+
var cancel = new Subject<Unit>();
89+
var obs = queue.EnqueueObservableOperation(1, "slow", cancel, () => Expensive().ToObservable());
90+
cancel.OnNext(Unit.Default); // cancels if not yet running or in-flight
91+
```
92+
93+
## Pause and resume
94+
95+
```csharp
96+
var gate = queue.PauseQueue();
97+
// enqueue work while paused; nothing executes yet
98+
// ...
99+
gate.Dispose(); // resumes and drains respecting priority/keys
100+
```
101+
102+
## Adjust concurrency at runtime
103+
104+
```csharp
105+
queue.SetMaximumConcurrent(8); // increases throughput
106+
```
107+
108+
## Shutting down
109+
110+
```csharp
111+
await queue.ShutdownQueue(); // completes when outstanding work finishes
112+
```
50113

51-
```cs
52-
// This file is super important, we don't care if it cuts in line in front
53-
// of some images or other stuff
54-
var wc = new WebClient();
55-
await opQueue.Enqueue(10 /* It's Important */,
56-
() => wc.DownloadFileTaskAsync("http://example.com/cool.txt", "./cool.txt"));
114+
## API overview
115+
116+
- OperationQueue
117+
- ctor(int maximumConcurrent = 4)
118+
- IObservable<T> EnqueueObservableOperation<T>(int priority, Func<IObservable<T>>)
119+
- IObservable<T> EnqueueObservableOperation<T>(int priority, string key, Func<IObservable<T>>)
120+
- IObservable<T> EnqueueObservableOperation<T, TDontCare>(int priority, string key, IObservable<TDontCare> cancel, Func<IObservable<T>>)
121+
- IDisposable PauseQueue()
122+
- void SetMaximumConcurrent(int maximumConcurrent)
123+
- IObservable<Unit> ShutdownQueue()
124+
125+
- OperationQueueExtensions
126+
- Task Enqueue(int priority, Func<Task>)
127+
- Task<T> Enqueue<T>(int priority, Func<Task<T>>)
128+
- Task Enqueue(int priority, string key, Func<Task>)
129+
- Task<T> Enqueue<T>(int priority, string key, Func<Task<T>>)
130+
- Overloads with CancellationToken for all of the above
131+
132+
## Best practices
133+
134+
- Prefer Task-based Enqueue APIs in application code; use observable APIs when composing with Rx.
135+
- Use descriptive keys for shared resources (e.g., "user:{id}", "file:{path}").
136+
- Keep operations idempotent and short; long operations block concurrency slots.
137+
- Use higher priorities sparingly; they jump the queue when a slot opens.
138+
- PauseQueue is ref-counted; always dispose the returned handle exactly once.
139+
- For cancellation via token, reuse CTS per user action to cancel pending work quickly.
140+
141+
## Advanced notes
142+
143+
- Unkeyed work is prioritized ahead of keyed work internally to keep the pipeline flowing; keys are serialized per group.
144+
- The semaphore releases when an operation completes, errors, or is canceled.
145+
- Cancellation before evaluation prevents invoking the supplied function.
146+
147+
## Full examples
148+
149+
- Image downloader with keys and priorities
150+
151+
```csharp
152+
var queue = new OperationQueue(3);
153+
154+
Task Download(string url, string dest, int pri, string key) =>
155+
queue.Enqueue(pri, key, async () =>
156+
{
157+
using var http = new HttpClient();
158+
var bytes = await http.GetByteArrayAsync(url);
159+
await File.WriteAllBytesAsync(dest, bytes);
160+
});
161+
162+
var tasks = new[]
163+
{
164+
Download("https://example.com/a.jpg", "a.jpg", 1, "img"),
165+
Download("https://example.com/b.jpg", "b.jpg", 1, "img"),
166+
queue.Enqueue(5, () => Task.Delay(100)), // higher priority misc work
167+
};
168+
await Task.WhenAll(tasks);
57169
```
58170

59-
## What else can this library do
171+
## Troubleshooting
60172

61-
* Cancellation via CancellationTokens or via Observables
62-
* Ensure certain operations don't run concurrently via a key
63-
* Queue pause / resume
173+
- Nothing runs? Ensure you didn't leave the queue paused. Dispose the token from PauseQueue.
174+
- Starvation? Check if you assigned very high priorities to long-running tasks.
175+
- Deadlock-like behavior with keys? Remember keyed operations are strictly serialized; avoid long critical sections.
64176

65177
## Contribute
66178

67179
Punchclock is developed under an OSI-approved open source license, making it freely usable and distributable, even for commercial use. Because of our Open Collective model for funding and transparency, we are able to funnel support and funds through to our contributors and community. We ❤ the people who are involved in this project, and we’d love to have you on board, especially if you are just getting started or have never contributed to open-source before.
68180

69181
So here's to you, lovely person who wants to join us — this is how you can support us:
70182

71-
* [Responding to questions on StackOverflow](https://stackoverflow.com/questions/tagged/punchclock)
72-
* [Passing on knowledge and teaching the next generation of developers](https://ericsink.com/entries/dont_use_rxui.html)
73-
* Submitting documentation updates where you see fit or lacking.
74-
* Making contributions to the code base.
183+
- [Responding to questions on StackOverflow](https://stackoverflow.com/questions/tagged/punchclock)
184+
- [Passing on knowledge and teaching the next generation of developers](https://ericsink.com/entries/dont_use_rxui.html)
185+
- Submitting documentation updates where you see fit or lacking.
186+
- Making contributions to the code base.

src/Punchclock.Tests/Punchclock.Tests.csproj

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66

77
<ItemGroup>
88
<PackageReference Include="DynamicData" Version="9.4.1" />
9-
<PackageReference Include="splat" Version="15.*" />
109
<PackageReference Include="PublicApiGenerator" Version="11.4.6" />
11-
<PackageReference Include="Verify.Xunit" Version="30.7.3" />
10+
<PackageReference Include="Verify.Xunit" Version="30.10.0" />
1211
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
1312
<PackageReference Include="xunit" Version="2.9.3" />
1413
<PackageReference Include="xunit.runner.console" Version="2.9.3" />

src/Punchclock.sln

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Microsoft Visual Studio Solution File, Format Version 12.00
2-
# 17
2+
# Visual Studio Version 17
33
VisualStudioVersion = 17.3.32922.545
44
MinimumVisualStudioVersion = 10.0.40219.1
55
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Punchclock", "Punchclock\Punchclock.csproj", "{D3D5E08E-2DAA-4C14-BDF1-C15BD81247F5}"
@@ -11,6 +11,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Items", "Items", "{E035091F
1111
analyzers.ruleset = analyzers.ruleset
1212
analyzers.tests.ruleset = analyzers.tests.ruleset
1313
Directory.build.props = Directory.build.props
14+
..\LICENSE = ..\LICENSE
1415
..\README.md = ..\README.md
1516
stylecop.json = stylecop.json
1617
..\version.json = ..\version.json

src/Punchclock/KeyedOperation.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public int CompareTo(KeyedOperation other)
3434
// up concurrency slots
3535
if (KeyIsDefault != other.KeyIsDefault)
3636
{
37-
return KeyIsDefault ? 1 : -1;
37+
// If this is non-keyed (default), it should sort before keyed -> return -1
38+
return KeyIsDefault ? -1 : 1;
3839
}
3940

4041
return other.Priority.CompareTo(Priority);

src/Punchclock/OperationQueue.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,23 @@
1515
namespace Punchclock;
1616

1717
/// <summary>
18+
/// <para>
1819
/// OperationQueue is the core of PunchClock, and represents a scheduler for
1920
/// deferred actions, such as network requests. This scheduler supports
2021
/// scheduling via priorities, as well as serializing requests that access
2122
/// the same data.
22-
///
23+
/// </para>
24+
/// <para>
2325
/// The queue allows a fixed number of concurrent in-flight operations at a
2426
/// time. When there are available "slots", items are dispatched as they come
2527
/// in. When the slots are full, the queueing policy starts to apply.
26-
///
28+
/// </para>
29+
/// <para>
2730
/// The queue, similar to Akavache's KeyedOperationQueue, also allows keys to
2831
/// be specified to serialize operations - if you have three "foo" items, they
2932
/// will wait in line and only one "foo" can run. However, a "bar" and "baz"
3033
/// item can run at the same time as a "foo" item.
34+
/// </para>
3135
/// </summary>
3236
public class OperationQueue : IDisposable
3337
{

src/Punchclock/OperationQueueExtensions.cs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66
using System;
77
using System.Reactive;
8+
using System.Reactive.Disposables;
89
using System.Reactive.Linq;
9-
using System.Reactive.Subjects;
1010
using System.Reactive.Threading.Tasks;
1111
using System.Threading;
1212
using System.Threading.Tasks;
@@ -135,20 +135,19 @@ public static Task Enqueue(this OperationQueue operationQueue, int priority, Fun
135135
.ToTask();
136136
}
137137

138-
private static IObservable<Unit> ConvertTokenToObservable(CancellationToken token)
139-
{
140-
var cancel = new AsyncSubject<Unit>();
141-
142-
if (token.IsCancellationRequested)
143-
{
144-
return Observable.Throw<Unit>(new ArgumentException("Token is already cancelled"));
145-
}
146-
147-
token.Register(() =>
138+
private static IObservable<Unit> ConvertTokenToObservable(CancellationToken token) =>
139+
Observable.Create<Unit>(observer =>
148140
{
149-
cancel.OnNext(Unit.Default);
150-
cancel.OnCompleted();
141+
if (token.IsCancellationRequested)
142+
{
143+
observer.OnError(new ArgumentException("Token is already cancelled", nameof(token)));
144+
return Disposable.Empty;
145+
}
146+
147+
return token.Register(() =>
148+
{
149+
observer.OnNext(Unit.Default);
150+
observer.OnCompleted();
151+
});
151152
});
152-
return cancel;
153-
}
154153
}

src/Punchclock/Punchclock.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10-
<PackageReference Include="System.Reactive" Version="6.0.1" />
10+
<PackageReference Include="System.Reactive" Version="6.0.2" />
1111
</ItemGroup>
1212

1313
</Project>

0 commit comments

Comments
 (0)