diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml
index 67ee2f1..b3ab7f0 100644
--- a/.github/workflows/ci-build.yml
+++ b/.github/workflows/ci-build.yml
@@ -16,17 +16,35 @@ jobs:
outputs:
nbgv: ${{ steps.nbgv.outputs.SemVer2 }}
steps:
+ - name: Get Current Visual Studio Information
+ shell: bash
+ run: |
+ dotnet tool update -g dotnet-vs
+ echo "## About RELEASE ##"
+ vs where release
+
+ - name: Update Visual Studio Latest Release
+ shell: bash
+ run: |
+ echo "## Update RELEASE ##"
+ vs update release Enterprise
+ vs modify release Enterprise +mobile +desktop +uwp +web
+ echo "## About RELEASE Updated ##"
+ vs where release
+ echo "##vso[task.prependpath]$(vs where release --prop=InstallationPath)\MSBuild\Current\Bin"
+
- name: Checkout
uses: actions/checkout@v3.1.0
with:
fetch-depth: 0
lfs: true
- - name: Install .NET 6
+ - name: Install .NET 6 & .NET7
uses: actions/setup-dotnet@v3
with:
- dotnet-version: 6.0.x
- dotnet-quality: 'preview'
+ dotnet-version: |
+ 6.0.x
+ 7.0.x
- name: NBGV
id: nbgv
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 223ae9f..b4aa7e2 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -16,17 +16,35 @@ jobs:
outputs:
nbgv: ${{ steps.nbgv.outputs.SemVer2 }}
steps:
+ - name: Get Current Visual Studio Information
+ shell: bash
+ run: |
+ dotnet tool update -g dotnet-vs
+ echo "## About RELEASE ##"
+ vs where release
+
+ - name: Update Visual Studio Latest Release
+ shell: bash
+ run: |
+ echo "## Update RELEASE ##"
+ vs update release Enterprise
+ vs modify release Enterprise +mobile +desktop +uwp +web
+ echo "## About RELEASE Updated ##"
+ vs where release
+ echo "##vso[task.prependpath]$(vs where release --prop=InstallationPath)\MSBuild\Current\Bin"
+
- name: Checkout
uses: actions/checkout@v3.1.0
with:
fetch-depth: 0
lfs: true
-
- - name: Install .NET 6
+
+ - name: Install .NET 6 & .NET7
uses: actions/setup-dotnet@v3
with:
- dotnet-version: 6.0.x
- dotnet-quality: 'preview'
+ dotnet-version: |
+ 6.0.x
+ 7.0.x
- name: NBGV
id: nbgv
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index af6c020..1ab9ac3 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -52,7 +52,7 @@
-
+
diff --git a/src/Minimalist.Reactive.Tests/AsyncSignalTests.cs b/src/Minimalist.Reactive.Tests/AsyncSignalTests.cs
new file mode 100644
index 0000000..02a9d8f
--- /dev/null
+++ b/src/Minimalist.Reactive.Tests/AsyncSignalTests.cs
@@ -0,0 +1,314 @@
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Minimalist.Reactive.Signals;
+using Xunit;
+
+namespace Minimalist.Reactive.Tests;
+
+///
+/// AsyncSignalTests.
+///
+public class AsyncSignalTests
+{
+ ///
+ /// Subscribes the argument checking.
+ ///
+ [Fact]
+ public void Subscribe_ArgumentChecking() =>
+ Assert.Throws(() => new AsyncSignal().Subscribe(null!));
+
+ ///
+ /// Called when [error argument checking].
+ ///
+ [Fact]
+ public void OnError_ArgumentChecking() =>
+ Assert.Throws(() => new AsyncSignal().OnError(null!));
+
+ ///
+ /// Awaits the blocking.
+ ///
+ [Fact]
+ public void Await_Blocking()
+ {
+ var s = new AsyncSignal();
+ GetResult_BlockingImpl(s.GetAwaiter());
+ }
+
+ ///
+ /// Awaits the throw.
+ ///
+ [Fact]
+ public void Await_Throw()
+ {
+ var s = new AsyncSignal();
+ GetResult_Blocking_ThrowImpl(s.GetAwaiter());
+ }
+
+ ///
+ /// Gets the result empty.
+ ///
+ [Fact]
+ public void GetResult_Empty()
+ {
+ var s = new AsyncSignal();
+ s.OnCompleted();
+ Assert.Throws(() => s.GetResult());
+ }
+
+ ///
+ /// Gets the result blocking.
+ ///
+ [Fact]
+ public void GetResult_Blocking() => GetResult_BlockingImpl(new AsyncSignal());
+
+ ///
+ /// Gets the result blocking throw.
+ ///
+ [Fact]
+ public void GetResult_Blocking_Throw() => GetResult_Blocking_ThrowImpl(new AsyncSignal());
+
+ ///
+ /// Gets the result context.
+ ///
+ [Fact]
+ public void GetResult_Context()
+ {
+ var x = new AsyncSignal();
+
+ var ctx = new MyContext();
+ var e = new ManualResetEvent(false);
+
+ Task.Run(() =>
+ {
+ SynchronizationContext.SetSynchronizationContext(ctx);
+
+ var a = x.GetAwaiter();
+ a.OnCompleted(() => e.Set());
+ });
+
+ x.OnNext(42);
+ x.OnCompleted();
+
+ e.WaitOne();
+
+ Assert.True(ctx.Ran);
+ }
+
+ ///
+ /// Determines whether this instance has observers.
+ ///
+ [Fact]
+ public void HasObservers()
+ {
+ var s = new AsyncSignal();
+ Assert.False(s.HasObservers);
+
+ var d1 = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ d1.Dispose();
+ Assert.False(s.HasObservers);
+
+ var d2 = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ var d3 = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ d2.Dispose();
+ Assert.True(s.HasObservers);
+
+ d3.Dispose();
+ Assert.False(s.HasObservers);
+ }
+
+ ///
+ /// Determines whether [has observers dispose1].
+ ///
+ [Fact]
+ public void HasObservers_Dispose1()
+ {
+ var s = new AsyncSignal();
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ var d = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ s.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+
+ d.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+ }
+
+ ///
+ /// Determines whether [has observers dispose2].
+ ///
+ [Fact]
+ public void HasObservers_Dispose2()
+ {
+ var s = new AsyncSignal();
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ var d = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ d.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ s.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+ }
+
+ ///
+ /// Determines whether [has observers dispose3].
+ ///
+ [Fact]
+ public void HasObservers_Dispose3()
+ {
+ var s = new AsyncSignal();
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ s.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+ }
+
+ ///
+ /// Determines whether [has observers on completed].
+ ///
+ [Fact]
+ public void HasObservers_OnCompleted()
+ {
+ var s = new AsyncSignal();
+ Assert.False(s.HasObservers);
+
+ var d = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ s.OnNext(42);
+ Assert.True(s.HasObservers);
+
+ s.OnCompleted();
+ Assert.False(s.HasObservers);
+ }
+
+ ///
+ /// Determines whether [has observers on error].
+ ///
+ [Fact]
+ public void HasObservers_OnError()
+ {
+ var s = new AsyncSignal();
+ Assert.False(s.HasObservers);
+
+ var d = s.Subscribe(_ => { }, _ => { });
+ Assert.True(s.HasObservers);
+
+ s.OnNext(42);
+ Assert.True(s.HasObservers);
+
+ s.OnError(new Exception());
+ Assert.False(s.HasObservers);
+ }
+
+ ///
+ /// Gets the result blocking implementation.
+ ///
+ /// The s.
+ private static void GetResult_BlockingImpl(IAwaitSignal s)
+ {
+ Assert.False(s.IsCompleted);
+
+ var e = new ManualResetEvent(false);
+
+ new Thread(() =>
+ {
+ e.WaitOne();
+ s.OnNext(42);
+ s.OnCompleted();
+ }).Start();
+
+ var y = default(int);
+ var t = new Thread(() => y = s.GetResult());
+ t.Start();
+
+ while (t.ThreadState != ThreadState.WaitSleepJoin)
+ {
+ }
+
+ e.Set();
+ t.Join();
+
+ Assert.Equal(42, y);
+ Assert.True(s.IsCompleted);
+ }
+
+ ///
+ /// Gets the result blocking throw implementation.
+ ///
+ /// The s.
+ private static void GetResult_Blocking_ThrowImpl(IAwaitSignal s)
+ {
+ Assert.False(s.IsCompleted);
+
+ var e = new ManualResetEvent(false);
+
+ var ex = new Exception();
+
+ new Thread(() =>
+ {
+ e.WaitOne();
+ s.OnError(ex);
+ }).Start();
+
+ var y = default(Exception);
+ var t = new Thread(() =>
+ {
+ try
+ {
+ s.GetResult();
+ }
+ catch (Exception ex_)
+ {
+ y = ex_;
+ }
+ });
+ t.Start();
+
+ while (t.ThreadState != ThreadState.WaitSleepJoin)
+ {
+ }
+
+ e.Set();
+ t.Join();
+
+ Assert.Same(ex, y);
+ Assert.True(s.IsCompleted);
+ }
+
+ private class MyContext : SynchronizationContext
+ {
+ public bool Ran { get; set; }
+
+ public override void Post(SendOrPostCallback d, object? state)
+ {
+ Ran = true;
+ d(state);
+ }
+ }
+}
diff --git a/src/Minimalist.Reactive.Tests/BehaviourSignalTests.cs b/src/Minimalist.Reactive.Tests/BehaviourSignalTests.cs
new file mode 100644
index 0000000..74ad503
--- /dev/null
+++ b/src/Minimalist.Reactive.Tests/BehaviourSignalTests.cs
@@ -0,0 +1,288 @@
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using Minimalist.Reactive.Signals;
+using Xunit;
+
+namespace Minimalist.Reactive.Tests;
+
+///
+/// BehaviourSignalTests.
+///
+public class BehaviourSignalTests
+{
+ ///
+ /// Subscribes the argument checking.
+ ///
+ [Fact]
+ public void Subscribe_ArgumentChecking() =>
+ Assert.Throws(() => new BehaviourSignal(1).Subscribe(null!));
+
+ ///
+ /// Called when [error argument checking].
+ ///
+ [Fact]
+ public void OnError_ArgumentChecking() =>
+ Assert.Throws(() => new BehaviourSignal(1).OnError(null!));
+
+ ///
+ /// Determines whether this instance has observers.
+ ///
+ [Fact]
+ public void HasObservers()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.False(s.HasObservers);
+
+ var d1 = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ d1.Dispose();
+ Assert.False(s.HasObservers);
+
+ var d2 = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ var d3 = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ d2.Dispose();
+ Assert.True(s.HasObservers);
+
+ d3.Dispose();
+ Assert.False(s.HasObservers);
+ }
+
+ ///
+ /// Determines whether [has observers dispose1].
+ ///
+ [Fact]
+ public void HasObservers_Dispose1()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ var d = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ s.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+
+ d.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+ }
+
+ ///
+ /// Determines whether [has observers dispose2].
+ ///
+ [Fact]
+ public void HasObservers_Dispose2()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ var d = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ d.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ s.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+ }
+
+ ///
+ /// Determines whether [has observers dispose3].
+ ///
+ [Fact]
+ public void HasObservers_Dispose3()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ s.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+ }
+
+ ///
+ /// Determines whether [has observers on completed].
+ ///
+ [Fact]
+ public void HasObservers_OnCompleted()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.False(s.HasObservers);
+
+ var d = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ s.OnNext(42);
+ Assert.True(s.HasObservers);
+
+ s.OnCompleted();
+ Assert.False(s.HasObservers);
+ }
+
+ ///
+ /// Determines whether [has observers on error].
+ ///
+ [Fact]
+ public void HasObservers_OnError()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.False(s.HasObservers);
+
+ var d = s.Subscribe(_ => { }, _ => { });
+ Assert.True(s.HasObservers);
+
+ s.OnNext(42);
+ Assert.True(s.HasObservers);
+
+ s.OnError(new Exception());
+ Assert.False(s.HasObservers);
+ }
+
+ ///
+ /// Values the initial.
+ ///
+ [Fact]
+ public void Value_Initial()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.Equal(42, s.Value);
+
+ Assert.True(s.TryGetValue(out var x));
+ Assert.Equal(42, x);
+ }
+
+ ///
+ /// Values the first.
+ ///
+ [Fact]
+ public void Value_First()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.Equal(42, s.Value);
+
+ Assert.True(s.TryGetValue(out var x));
+ Assert.Equal(42, x);
+
+ s.OnNext(43);
+ Assert.Equal(43, s.Value);
+
+ Assert.True(s.TryGetValue(out x));
+ Assert.Equal(43, x);
+ }
+
+ ///
+ /// Values the second.
+ ///
+ [Fact]
+ public void Value_Second()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.Equal(42, s.Value);
+
+ Assert.True(s.TryGetValue(out var x));
+ Assert.Equal(42, x);
+
+ s.OnNext(43);
+ Assert.Equal(43, s.Value);
+
+ Assert.True(s.TryGetValue(out x));
+ Assert.Equal(43, x);
+
+ s.OnNext(44);
+ Assert.Equal(44, s.Value);
+
+ Assert.True(s.TryGetValue(out x));
+ Assert.Equal(44, x);
+ }
+
+ ///
+ /// Values the frozen after on completed.
+ ///
+ [Fact]
+ public void Value_FrozenAfterOnCompleted()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.Equal(42, s.Value);
+
+ Assert.True(s.TryGetValue(out var x));
+ Assert.Equal(42, x);
+
+ s.OnNext(43);
+ Assert.Equal(43, s.Value);
+
+ Assert.True(s.TryGetValue(out x));
+ Assert.Equal(43, x);
+
+ s.OnNext(44);
+ Assert.Equal(44, s.Value);
+
+ Assert.True(s.TryGetValue(out x));
+ Assert.Equal(44, x);
+
+ s.OnCompleted();
+ Assert.Equal(44, s.Value);
+
+ Assert.True(s.TryGetValue(out x));
+ Assert.Equal(44, x);
+
+ s.OnNext(1234);
+ Assert.Equal(44, s.Value);
+
+ Assert.True(s.TryGetValue(out x));
+ Assert.Equal(44, x);
+ }
+
+ ///
+ /// Values the throws after on error.
+ ///
+ [Fact]
+ public void Value_ThrowsAfterOnError()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.Equal(42, s.Value);
+
+ s.OnError(new InvalidOperationException());
+
+ Assert.Throws(() =>
+ {
+ var ignored = s.Value;
+ });
+
+ Assert.Throws(() => s.TryGetValue(out var x));
+ }
+
+ ///
+ /// Values the throws on dispose.
+ ///
+ [Fact]
+ public void Value_ThrowsOnDispose()
+ {
+ var s = new BehaviourSignal(42);
+ Assert.Equal(42, s.Value);
+
+ s.Dispose();
+
+ Assert.Throws(() =>
+ {
+ var ignored = s.Value;
+ });
+
+ Assert.False(s.TryGetValue(out var x));
+ }
+}
diff --git a/src/Minimalist.Reactive.Tests/ConcurencyTests.cs b/src/Minimalist.Reactive.Tests/ConcurencyTests.cs
new file mode 100644
index 0000000..062c31e
--- /dev/null
+++ b/src/Minimalist.Reactive.Tests/ConcurencyTests.cs
@@ -0,0 +1,119 @@
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Threading;
+using Minimalist.Reactive.Concurrency;
+using Minimalist.Reactive.Disposables;
+using Xunit;
+
+namespace Minimalist.Reactive.Tests;
+
+///
+/// ConcurencyTests.
+///
+public class ConcurencyTests
+{
+ ///
+ /// Tests this instance.
+ ///
+ [Fact]
+ public void TestCreate()
+ {
+ var scheduler = TaskPoolScheduler.Instance;
+ var disposable = scheduler.Schedule(0, (__, _) => Disposable.Empty);
+ Assert.NotNull(disposable);
+ disposable.Dispose();
+ }
+
+ ///
+ /// Tasks the pool now.
+ ///
+ [Fact]
+ public void TaskPoolNow()
+ {
+ var res = TaskPoolScheduler.Instance.Now - DateTime.Now;
+ Assert.True(res.Seconds < 1);
+ }
+
+ ///
+ /// Tasks the pool schedule action.
+ ///
+ [Fact]
+ public void TaskPoolScheduleAction()
+ {
+ var id = Environment.CurrentManagedThreadId;
+ var nt = TaskPoolScheduler.Instance;
+ var evt = new ManualResetEvent(false);
+ nt.Schedule(() =>
+ {
+ Assert.NotEqual(id, Environment.CurrentManagedThreadId);
+ evt.Set();
+ });
+ evt.WaitOne();
+ }
+
+ ///
+ /// Tasks the pool schedule action due now.
+ ///
+ [Fact]
+ public void TaskPoolScheduleActionDueNow()
+ {
+ var id = Environment.CurrentManagedThreadId;
+ var nt = TaskPoolScheduler.Instance;
+ var evt = new ManualResetEvent(false);
+ nt.Schedule(TimeSpan.Zero, () =>
+ {
+ Assert.NotEqual(id, Environment.CurrentManagedThreadId);
+ evt.Set();
+ });
+ evt.WaitOne();
+ }
+
+ ///
+ /// Tasks the pool schedule action due.
+ ///
+ [Fact]
+ public void TaskPoolScheduleActionDue()
+ {
+ var id = Environment.CurrentManagedThreadId;
+ var nt = TaskPoolScheduler.Instance;
+ var evt = new ManualResetEvent(false);
+ nt.Schedule(TimeSpan.FromMilliseconds(1), () =>
+ {
+ Assert.NotEqual(id, Environment.CurrentManagedThreadId);
+ evt.Set();
+ });
+ evt.WaitOne();
+ }
+
+ ///
+ /// Tasks the pool schedule action cancel.
+ ///
+ [Fact]
+ public void TaskPoolScheduleActionCancel()
+ {
+ var id = Environment.CurrentManagedThreadId;
+ var nt = TaskPoolScheduler.Instance;
+ var set = false;
+ var d = nt.Schedule(TimeSpan.FromSeconds(0.2), () => set = true);
+ d.Dispose();
+ Thread.Sleep(400);
+ Assert.False(set);
+ }
+
+ ///
+ /// Tasks the pool delay larger than int maximum value.
+ ///
+ [Fact]
+ public void TaskPoolDelayLargerThanIntMaxValue()
+ {
+ var dueTime = TimeSpan.FromMilliseconds((double)int.MaxValue + 1);
+
+ // Just ensuring the call to Schedule does not throw.
+ var d = TaskPoolScheduler.Instance.Schedule(dueTime, () => { });
+
+ d.Dispose();
+ }
+}
diff --git a/src/Minimalist.Reactive.Tests/DisposableTests.cs b/src/Minimalist.Reactive.Tests/DisposableTests.cs
index 802066d..ed253eb 100644
--- a/src/Minimalist.Reactive.Tests/DisposableTests.cs
+++ b/src/Minimalist.Reactive.Tests/DisposableTests.cs
@@ -1,114 +1,113 @@
-// Copyright (c) 2019-2022 ReactiveUI Association Incorporated. All rights reserved.
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
+using Minimalist.Reactive.Disposables;
using Xunit;
-namespace Minimalist.Reactive.Tests
+namespace Minimalist.Reactive.Tests;
+
+///
+/// DisposableTests.
+///
+public class DisposableTests
{
+ ///
+ /// Called when [dispose once].
+ ///
+ [Fact]
+ public void OnlyDisposeOnce()
+ {
+ var disposed = 0;
+ var disposable = Disposable.Create(() => disposed++);
+
+ disposable.Dispose();
+
+ Assert.Equal(1, disposed);
+
+ disposable.Dispose();
+
+ Assert.Equal(1, disposed);
+ }
///
- /// DisposableTests.
+ /// Empties the disposable.
///
- public class DisposableTests
+ [Fact]
+ public void EmptyDisposable()
{
- ///
- /// Called when [dispose once].
- ///
- [Fact]
- public void OnlyDisposeOnce()
- {
- var disposed = 0;
- var disposable = Disposable.Create(() => disposed++);
-
- disposable.Dispose();
-
- Assert.Equal(1, disposed);
-
- disposable.Dispose();
-
- Assert.Equal(1, disposed);
- }
-
- ///
- /// Empties the disposable.
- ///
- [Fact]
- public void EmptyDisposable()
- {
- var disposable = Disposable.Empty;
- disposable.Dispose();
- disposable.Dispose();
- disposable.Dispose();
- }
-
- ///
- /// Singles the disposable dispose.
- ///
- [Fact]
- public void SingleDisposableDispose()
- {
- var disposable = new SingleDisposable(Disposable.Empty);
- disposable.Dispose();
- Assert.True(disposable.IsDisposed);
- }
-
- ///
- /// Singles the disposable dispose with action.
- ///
- [Fact]
- public void SingleDisposableDisposeWithAction()
- {
- var disposed = 0;
- var disposable = new SingleDisposable(Disposable.Empty, () => disposed++);
- disposable.Dispose();
- Assert.True(disposable.IsDisposed);
- Assert.Equal(1, disposed);
-
- disposable.Dispose();
- Assert.True(disposable.IsDisposed);
- Assert.Equal(1, disposed);
-
- disposable.Dispose();
- Assert.True(disposable.IsDisposed);
- Assert.Equal(1, disposed);
- }
-
- ///
- /// Multiples the disposable dispose.
- ///
- [Fact]
- public void MultipleDisposableDispose()
- {
- var disposable = new MultipleDisposable();
- disposable.Dispose();
- Assert.True(disposable.IsDisposed);
- }
-
- ///
- /// Multiples the disposable with items dispose.
- ///
- [Fact]
- public void MultipleDisposableWithItemsDispose()
- {
- var disposable = new MultipleDisposable();
- disposable.Add(Disposable.Empty);
- var disposed = 0;
-
- // create a disposable that will be disposed when the MultipleDisposable is disposed
- var singleDisposable = Disposable.Empty.DisposeWith(() => disposed++);
-
- // add the disposable to the MultipleDisposable
- singleDisposable?.DisposeWith(disposable);
-
- var singleDisposable2 = Disposable.Empty.DisposeWith();
- singleDisposable2?.DisposeWith(disposable);
-
- disposable.Dispose();
- Assert.True(disposable.IsDisposed);
- Assert.True(singleDisposable?.IsDisposed);
- Assert.True(singleDisposable2?.IsDisposed);
- Assert.Equal(1, disposed);
- }
+ var disposable = Disposable.Empty;
+ disposable.Dispose();
+ disposable.Dispose();
+ disposable.Dispose();
+ }
+
+ ///
+ /// Singles the disposable dispose.
+ ///
+ [Fact]
+ public void SingleDisposableDispose()
+ {
+ var disposable = new SingleDisposable(Disposable.Empty);
+ disposable.Dispose();
+ Assert.True(disposable.IsDisposed);
+ }
+
+ ///
+ /// Singles the disposable dispose with action.
+ ///
+ [Fact]
+ public void SingleDisposableDisposeWithAction()
+ {
+ var disposed = 0;
+ var disposable = new SingleDisposable(Disposable.Empty, () => disposed++);
+ disposable.Dispose();
+ Assert.True(disposable.IsDisposed);
+ Assert.Equal(1, disposed);
+
+ disposable.Dispose();
+ Assert.True(disposable.IsDisposed);
+ Assert.Equal(1, disposed);
+
+ disposable.Dispose();
+ Assert.True(disposable.IsDisposed);
+ Assert.Equal(1, disposed);
+ }
+
+ ///
+ /// Multiples the disposable dispose.
+ ///
+ [Fact]
+ public void MultipleDisposableDispose()
+ {
+ var disposable = new MultipleDisposable();
+ disposable.Dispose();
+ Assert.True(disposable.IsDisposed);
+ }
+
+ ///
+ /// Multiples the disposable with items dispose.
+ ///
+ [Fact]
+ public void MultipleDisposableWithItemsDispose()
+ {
+ var disposable = new MultipleDisposable();
+ disposable.Add(Disposable.Empty);
+ var disposed = 0;
+
+ // create a disposable that will be disposed when the MultipleDisposable is disposed
+ var singleDisposable = Disposable.Empty.DisposeWith(() => disposed++);
+
+ // add the disposable to the MultipleDisposable
+ singleDisposable?.DisposeWith(disposable);
+
+ var singleDisposable2 = Disposable.Empty.DisposeWith();
+ singleDisposable2?.DisposeWith(disposable);
+
+ disposable.Dispose();
+ Assert.True(disposable.IsDisposed);
+ Assert.True(singleDisposable?.IsDisposed);
+ Assert.True(singleDisposable2?.IsDisposed);
+ Assert.Equal(1, disposed);
}
}
diff --git a/src/Minimalist.Reactive.Tests/DummyDisposable.cs b/src/Minimalist.Reactive.Tests/DummyDisposable.cs
new file mode 100644
index 0000000..97ce632
--- /dev/null
+++ b/src/Minimalist.Reactive.Tests/DummyDisposable.cs
@@ -0,0 +1,16 @@
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+#if NET48
+#endif
+
+namespace Minimalist.Reactive.Tests;
+
+internal class DummyDisposable : IDisposable
+{
+ public static readonly DummyDisposable Instance = new();
+
+ public void Dispose() => throw new NotImplementedException();
+}
diff --git a/src/Minimalist.Reactive.Tests/Minimalist.Reactive.Tests.csproj b/src/Minimalist.Reactive.Tests/Minimalist.Reactive.Tests.csproj
index 5110094..213c4bf 100644
--- a/src/Minimalist.Reactive.Tests/Minimalist.Reactive.Tests.csproj
+++ b/src/Minimalist.Reactive.Tests/Minimalist.Reactive.Tests.csproj
@@ -1,16 +1,16 @@
-
+
- net6.0;net48
+ net48;net6.0;net7.0
enable
false
preview
-
+
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/src/Minimalist.Reactive.Tests/ReplaySignalTests.cs b/src/Minimalist.Reactive.Tests/ReplaySignalTests.cs
new file mode 100644
index 0000000..3795714
--- /dev/null
+++ b/src/Minimalist.Reactive.Tests/ReplaySignalTests.cs
@@ -0,0 +1,239 @@
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using Minimalist.Reactive.Concurrency;
+using Minimalist.Reactive.Signals;
+using Xunit;
+
+namespace Minimalist.Reactive.Tests;
+
+///
+/// ReplaySignalTests.
+///
+public class ReplaySignalTests
+{
+ ///
+ /// Constructors the argument checking.
+ ///
+ [Fact]
+ public void Constructor_ArgumentChecking()
+ {
+ Assert.Throws(() => new ReplaySignal(-1));
+ Assert.Throws(() => new ReplaySignal(-1, EmptyScheduler.Instance));
+ Assert.Throws(() => new ReplaySignal(-1, TimeSpan.Zero));
+ Assert.Throws(() => new ReplaySignal(-1, TimeSpan.Zero, EmptyScheduler.Instance));
+
+ Assert.Throws(() => new ReplaySignal(TimeSpan.FromTicks(-1)));
+ Assert.Throws(() => new ReplaySignal(TimeSpan.FromTicks(-1), EmptyScheduler.Instance));
+ Assert.Throws(() => new ReplaySignal(0, TimeSpan.FromTicks(-1)));
+ Assert.Throws(() => new ReplaySignal(0, TimeSpan.FromTicks(-1), EmptyScheduler.Instance));
+
+ Assert.Throws(() => new ReplaySignal(null!));
+ Assert.Throws(() => new ReplaySignal(0, null!));
+ Assert.Throws(() => new ReplaySignal(TimeSpan.Zero, null!));
+ Assert.Throws(() => new ReplaySignal(0, TimeSpan.Zero, null!));
+
+ // zero allowed
+ new ReplaySignal(0);
+ new ReplaySignal(TimeSpan.Zero);
+ new ReplaySignal(0, TimeSpan.Zero);
+ new ReplaySignal(0, EmptyScheduler.Instance);
+ new ReplaySignal(TimeSpan.Zero, EmptyScheduler.Instance);
+ new ReplaySignal(0, TimeSpan.Zero, EmptyScheduler.Instance);
+ }
+
+ ///
+ /// Determines whether this instance has observers.
+ ///
+ [Fact]
+ public void HasObservers()
+ {
+ HasObserversImpl(new ReplaySignal());
+ HasObserversImpl(new ReplaySignal(1));
+ HasObserversImpl(new ReplaySignal(3));
+ HasObserversImpl(new ReplaySignal(TimeSpan.FromSeconds(1)));
+ }
+
+ ///
+ /// Determines whether [has observers dispose1].
+ ///
+ [Fact]
+ public void HasObservers_Dispose1()
+ {
+ HasObservers_Dispose1Impl(new ReplaySignal());
+ HasObservers_Dispose1Impl(new ReplaySignal(1));
+ HasObservers_Dispose1Impl(new ReplaySignal(3));
+ HasObservers_Dispose1Impl(new ReplaySignal(TimeSpan.FromSeconds(1)));
+ }
+
+ ///
+ /// Determines whether [has observers dispose2].
+ ///
+ [Fact]
+ public void HasObservers_Dispose2()
+ {
+ HasObservers_Dispose2Impl(new ReplaySignal());
+ HasObservers_Dispose2Impl(new ReplaySignal(1));
+ HasObservers_Dispose2Impl(new ReplaySignal(3));
+ HasObservers_Dispose2Impl(new ReplaySignal(TimeSpan.FromSeconds(1)));
+ }
+
+ ///
+ /// Determines whether [has observers dispose3].
+ ///
+ [Fact]
+ public void HasObservers_Dispose3()
+ {
+ HasObservers_Dispose3Impl(new ReplaySignal());
+ HasObservers_Dispose3Impl(new ReplaySignal(1));
+ HasObservers_Dispose3Impl(new ReplaySignal(3));
+ HasObservers_Dispose3Impl(new ReplaySignal(TimeSpan.FromSeconds(1)));
+ }
+
+ ///
+ /// Determines whether [has observers on completed].
+ ///
+ [Fact]
+ public void HasObservers_OnCompleted()
+ {
+ HasObservers_OnCompletedImpl(new ReplaySignal());
+ HasObservers_OnCompletedImpl(new ReplaySignal(1));
+ HasObservers_OnCompletedImpl(new ReplaySignal(3));
+ HasObservers_OnCompletedImpl(new ReplaySignal(TimeSpan.FromSeconds(1)));
+ }
+
+ ///
+ /// Determines whether [has observers on error].
+ ///
+ [Fact]
+ public void HasObservers_OnError()
+ {
+ HasObservers_OnErrorImpl(new ReplaySignal());
+ HasObservers_OnErrorImpl(new ReplaySignal(1));
+ HasObservers_OnErrorImpl(new ReplaySignal(3));
+ HasObservers_OnErrorImpl(new ReplaySignal(TimeSpan.FromSeconds(1)));
+ }
+
+ ///
+ /// Called when [error argument checking].
+ ///
+ [Fact]
+ public void OnError_ArgumentChecking()
+ {
+ Assert.Throws(() => new ReplaySignal().OnError(null!));
+ Assert.Throws(() => new ReplaySignal(1).OnError(null!));
+ Assert.Throws(() => new ReplaySignal(2).OnError(null!));
+ Assert.Throws(() => new ReplaySignal(EmptyScheduler.Instance).OnError(null!));
+ }
+
+ ///
+ /// Subscribes the argument checking.
+ ///
+ [Fact]
+ public void Subscribe_ArgumentChecking()
+ {
+ Assert.Throws(() => new ReplaySignal().Subscribe(null!));
+ Assert.Throws(() => new ReplaySignal(1).Subscribe(null!));
+ Assert.Throws(() => new ReplaySignal(2).Subscribe(null!));
+ Assert.Throws(() => new ReplaySignal(EmptyScheduler.Instance).Subscribe(null!));
+ }
+
+ private static void HasObservers_Dispose1Impl(ReplaySignal s)
+ {
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ var d = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ s.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+
+ d.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+ }
+
+ private static void HasObservers_Dispose2Impl(ReplaySignal s)
+ {
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ var d = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ d.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ s.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+ }
+
+ private static void HasObservers_Dispose3Impl(ReplaySignal s)
+ {
+ Assert.False(s.HasObservers);
+ Assert.False(s.IsDisposed);
+
+ s.Dispose();
+ Assert.False(s.HasObservers);
+ Assert.True(s.IsDisposed);
+ }
+
+ private static void HasObservers_OnCompletedImpl(ReplaySignal s)
+ {
+ Assert.False(s.HasObservers);
+
+ var d = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ s.OnNext(42);
+ Assert.True(s.HasObservers);
+
+ s.OnCompleted();
+ Assert.False(s.HasObservers);
+ }
+
+ private static void HasObservers_OnErrorImpl(ReplaySignal s)
+ {
+ Assert.False(s.HasObservers);
+
+ var d = s.Subscribe(_ => { }, _ => { });
+ Assert.True(s.HasObservers);
+
+ s.OnNext(42);
+ Assert.True(s.HasObservers);
+
+ s.OnError(new Exception());
+ Assert.False(s.HasObservers);
+ }
+
+ private static void HasObserversImpl(ReplaySignal s)
+ {
+ Assert.False(s.HasObservers);
+
+ var d1 = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ d1.Dispose();
+ Assert.False(s.HasObservers);
+
+ var d2 = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ var d3 = s.Subscribe(_ => { });
+ Assert.True(s.HasObservers);
+
+ d2.Dispose();
+ Assert.True(s.HasObservers);
+
+ d3.Dispose();
+ Assert.False(s.HasObservers);
+ }
+}
diff --git a/src/Minimalist.Reactive.Tests/SignalCreateTests.cs b/src/Minimalist.Reactive.Tests/SignalCreateTests.cs
new file mode 100644
index 0000000..7350037
--- /dev/null
+++ b/src/Minimalist.Reactive.Tests/SignalCreateTests.cs
@@ -0,0 +1,124 @@
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Minimalist.Reactive.Disposables;
+using Minimalist.Reactive.Signals;
+using Xunit;
+
+namespace Minimalist.Reactive.Tests;
+
+///
+/// SignalsCreateTests.
+///
+public class SignalCreateTests
+{
+ ///
+ /// Creates the argument checking.
+ ///
+ [Fact]
+ public void Create_ArgumentChecking()
+ {
+ Assert.Throws(() => Signal.Create(default(Func, IDisposable>)));
+
+ Assert.Throws(() => Signal.Create(default).Subscribe(null));
+ }
+
+ ///
+ /// Creates the null coalescing action.
+ ///
+ [Fact]
+ public void Create_NullCoalescingAction()
+ {
+ var xs = Signal.Create(o =>
+ {
+ o.OnNext(42);
+ return Disposable.Create(default!);
+ });
+
+ var lst = new List();
+ var d = xs.Subscribe(lst.Add);
+ d.Dispose();
+
+ Assert.True(lst.SequenceEqual(new[] { 42 }));
+ }
+
+ ///
+ /// Creates the exception.
+ ///
+ [Fact]
+ public void Create_Exception() =>
+ Assert.Throws(() =>
+ Signal.Create(new Func, IDisposable>(_ => throw new InvalidOperationException())).Subscribe());
+
+ ///
+ /// Creates the observer throws.
+ ///
+ [Fact]
+ public void Create_ObserverThrows()
+ {
+ Assert.Throws(() =>
+ Signal.Create(o =>
+ {
+ o.OnNext(1);
+ return Disposable.Empty;
+ }).Subscribe(x => { throw new InvalidOperationException(); }));
+ Assert.Throws(() =>
+ Signal.Create(o =>
+ {
+ o.OnError(new Exception());
+ return Disposable.Empty;
+ }).Subscribe(x => { }, ex => { throw new InvalidOperationException(); }));
+ Assert.Throws(() =>
+ Signal.Create(o =>
+ {
+ o.OnCompleted();
+ return Disposable.Empty;
+ }).Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }));
+ }
+
+ ///
+ /// Creates the with disposable argument checking.
+ ///
+ [Fact]
+ public void CreateWithDisposable_ArgumentChecking()
+ {
+ Assert.Throws(() => Signal.Create(default(Func, IDisposable>)));
+ Assert.Throws(() => Signal.Create(_ => DummyDisposable.Instance).Subscribe(null));
+ Assert.Throws(() => Signal.Create(o =>
+ {
+ o.OnError(null);
+ return DummyDisposable.Instance;
+ }).Subscribe(null));
+ }
+
+ ///
+ /// Creates the with disposable null coalescing action.
+ ///
+ [Fact]
+ public void CreateWithDisposable_NullCoalescingAction()
+ {
+ var xs = Signal.Create(o =>
+ {
+ o.OnNext(42);
+ return default!;
+ });
+
+ var lst = new List();
+ var d = xs.Subscribe(lst.Add);
+ d.Dispose();
+
+ Assert.True(lst.SequenceEqual(new[] { 42 }));
+ }
+
+ ///
+ /// Creates the with disposable exception.
+ ///
+ [Fact]
+ public void CreateWithDisposable_Exception() =>
+ Assert.Throws(() =>
+ Signal.Create(new Func, IDisposable>(_ => throw new InvalidOperationException())).Subscribe());
+}
diff --git a/src/Minimalist.Reactive.Tests/SignalFromTaskTest.cs b/src/Minimalist.Reactive.Tests/SignalFromTaskTest.cs
new file mode 100644
index 0000000..041ab59
--- /dev/null
+++ b/src/Minimalist.Reactive.Tests/SignalFromTaskTest.cs
@@ -0,0 +1,599 @@
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Minimalist.Reactive.Signals;
+using Xunit;
+
+namespace Minimalist.Reactive.Tests;
+
+///
+/// SignalFromTaskTest.
+///
+public class SignalFromTaskTest
+{
+ ///
+ /// Signals from task handles user exceptions.
+ ///
+ /// A representing the asynchronous unit test.
+ [Fact]
+ public async Task SignalFromTaskHandlesUserExceptions()
+ {
+ var statusTrail = new List<(int, string)>();
+ var position = 0;
+ Exception? exception = null;
+ var fixture = Signal.FromTask(
+ async (cts) =>
+ {
+ statusTrail.Add((position++, "started command"));
+ await Task.Delay(10000, cts.Token).HandleCancellation(async () =>
+ {
+ // User Handles cancellation.
+ statusTrail.Add((position++, "starting cancelling command"));
+
+ // dummy cleanup
+ await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
+ statusTrail.Add((position++, "finished cancelling command"));
+ }).ConfigureAwait(true);
+
+ if (!cts.IsCancellationRequested)
+ {
+ statusTrail.Add((position++, "finished command Normally"));
+ }
+
+ throw new Exception("break execution");
+ }).Catch(
+ ex =>
+ {
+ exception = ex;
+ statusTrail.Add((position++, "Exception Should Be here"));
+ return Signal.Throw(ex);
+ }).Finally(() => statusTrail.Add((position++, "Should always come here.")));
+
+ var result = false;
+ var cancel = fixture.Subscribe(_ => result = true);
+ await Task.Delay(500).ConfigureAwait(true);
+
+ Assert.Contains("started command", statusTrail.Select(x => x.Item2));
+
+ await Task.Delay(10000).ConfigureAwait(true);
+ cancel.Dispose();
+
+ // Wait 6000 ms to allow execution and cleanup to complete
+ await Task.Delay(6000).ConfigureAwait(false);
+
+ Assert.DoesNotContain("starting cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.Contains("Should always come here.", statusTrail.Select(x => x.Item2));
+ Assert.DoesNotContain("finished cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.Contains("Exception Should Be here", statusTrail.Select(x => x.Item2));
+ Assert.Contains("finished command Normally", statusTrail.Select(x => x.Item2));
+ Assert.False(result);
+ //// (0, "started command")
+ //// (1, "finished command Normally")
+ //// (2, "Exception Should Be here")
+ //// (3, "Should always come here.")
+ }
+
+ ///
+ /// Signals from task handles cancellation.
+ ///
+ /// A representing the asynchronous unit test.
+ [Fact]
+ public async Task SignalFromTaskHandlesCancellation()
+ {
+ var statusTrail = new List<(int, string)>();
+ var position = 0;
+ Exception? exception = null;
+ var fixture = Signal.FromTask(
+ async (cts) =>
+ {
+ statusTrail.Add((position++, "started command"));
+ await Task.Delay(10000, cts.Token).HandleCancellation(async () =>
+ {
+ // User Handles cancellation.
+ statusTrail.Add((position++, "starting cancelling command"));
+
+ // dummy cleanup
+ await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
+ statusTrail.Add((position++, "finished cancelling command"));
+ }).ConfigureAwait(true);
+
+ if (!cts.IsCancellationRequested)
+ {
+ statusTrail.Add((position++, "finished command Normally"));
+ }
+
+ return RxVoid.Default;
+ }).Catch(
+ ex =>
+ {
+ exception = ex;
+ statusTrail.Add((position++, "Exception Should Be here"));
+ return Signal.Throw(ex);
+ }).Finally(() => statusTrail.Add((position++, "Should always come here.")));
+
+ var result = false;
+ var cancel = fixture.Subscribe(_ => result = true);
+ await Task.Delay(500).ConfigureAwait(true);
+
+ Assert.Contains("started command", statusTrail.Select(x => x.Item2));
+ cancel.Dispose();
+
+ // Wait 6000 ms to allow execution and cleanup to complete
+ await Task.Delay(6000).ConfigureAwait(false);
+
+ Assert.Contains("starting cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.Contains("Should always come here.", statusTrail.Select(x => x.Item2));
+ Assert.Contains("finished cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.DoesNotContain("finished command Normally", statusTrail.Select(x => x.Item2));
+ Assert.False(result);
+ //// (0, "started command")
+ //// (1, "starting cancelling command")
+ //// (2, "Should always come here.")
+ //// (3, "finished cancelling command")
+ }
+
+ ///
+ /// Signals from task handles token cancellation.
+ ///
+ /// A representing the asynchronous unit test.
+ [Fact]
+ public async Task SignalFromTaskHandlesTokenCancellation()
+ {
+ var statusTrail = new List<(int, string)>();
+ var position = 0;
+ Exception? exception = null;
+ var fixture = Signal.FromTask(
+ async (cts) =>
+ {
+ statusTrail.Add((position++, "started command"));
+ await Task.Delay(1000, cts.Token).HandleCancellation();
+ _ = Task.Run(async () =>
+ {
+ // Wait for 1s then cancel
+ await Task.Delay(1000);
+ cts.Cancel();
+ });
+ await Task.Delay(5000, cts.Token).HandleCancellation(async () =>
+ {
+ // User Handles cancellation.
+ statusTrail.Add((position++, "starting cancelling command"));
+
+ // dummy cleanup
+ await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
+ statusTrail.Add((position++, "finished cancelling command"));
+ }).ConfigureAwait(true);
+
+ if (!cts.IsCancellationRequested)
+ {
+ statusTrail.Add((position++, "finished command Normally"));
+ }
+
+ return RxVoid.Default;
+ }).Catch(
+ ex =>
+ {
+ exception = ex;
+ statusTrail.Add((position++, "Exception Should Be here"));
+ return Signal.Throw(ex);
+ }).Finally(() => statusTrail.Add((position++, "Should always come here.")));
+
+ var result = false;
+ var cancel = fixture.Subscribe(_ => result = true);
+ await Task.Delay(500).ConfigureAwait(true);
+
+ Assert.Contains("started command", statusTrail.Select(x => x.Item2));
+
+ // Wait 8000 ms to allow execution and cleanup to complete
+ await Task.Delay(8000).ConfigureAwait(false);
+
+ Assert.Contains("starting cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.Contains("Should always come here.", statusTrail.Select(x => x.Item2));
+ Assert.Contains("finished cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.DoesNotContain("finished command Normally", statusTrail.Select(x => x.Item2));
+ Assert.False(result);
+ //// (0, "started command")
+ //// (1, "starting cancelling command")
+ //// (2, "Should always come here.")
+ //// (3, "finished cancelling command")
+ }
+
+ ///
+ /// Signals from task handles cancellation in base.
+ ///
+ /// A representing the asynchronous unit test.
+ [Fact]
+ public async Task SignalFromTaskHandlesCancellationInBase()
+ {
+ var statusTrail = new List<(int, string)>();
+ var position = 0;
+ Exception? exception = null;
+ var fixture = Signal.FromTask(
+ async (cts) =>
+ {
+ var ex = new Exception();
+ statusTrail.Add((position++, "started command"));
+ await Task.Delay(10000, cts.Token).ConfigureAwait(true);
+ if (!cts.IsCancellationRequested)
+ {
+ statusTrail.Add((position++, "finished command Normally"));
+ }
+
+ return RxVoid.Default;
+ }).Catch(
+ ex =>
+ {
+ exception = ex;
+ statusTrail.Add((position++, "Exception Should Be here"));
+ return Signal.Throw(ex);
+ }).Finally(() => statusTrail.Add((position++, "Should always come here.")));
+
+ var cancel = fixture.Subscribe();
+ await Task.Delay(500).ConfigureAwait(true);
+ Assert.Contains("started command", statusTrail.Select(x => x.Item2));
+ cancel.Dispose();
+
+ // Wait 5050 ms to allow execution and cleanup to complete
+ await Task.Delay(6000).ConfigureAwait(false);
+
+ Assert.DoesNotContain("finished command Normally", statusTrail.Select(x => x.Item2));
+ Assert.Equal("Should always come here.", statusTrail.Last().Item2);
+
+ //// (0, "started command")
+ //// (1, "Should always come here.")
+ }
+
+ ///
+ /// Signals from task handles completion.
+ ///
+ /// A representing the asynchronous unit test.
+ [Fact]
+ public async Task SignalFromTaskHandlesCompletion()
+ {
+ var statusTrail = new List<(int, string)>();
+ var position = 0;
+ Exception? exception = null;
+ var fixture = Signal.FromTask(
+ async (cts) =>
+ {
+ statusTrail.Add((position++, "started command"));
+ await Task.Delay(10000, cts.Token).HandleCancellation(async () =>
+ {
+ // NOT EXPECTED TO ENTER HERE
+
+ // User Handles cancellation.
+ statusTrail.Add((position++, "starting cancelling command"));
+
+ // dummy cleanup
+ await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
+ statusTrail.Add((position++, "finished cancelling command"));
+ }).ConfigureAwait(true);
+
+ if (!cts.IsCancellationRequested)
+ {
+ statusTrail.Add((position++, "finished command Normally"));
+ }
+
+ return RxVoid.Default;
+ }).Catch(
+ ex =>
+ {
+ exception = ex;
+ statusTrail.Add((position++, "Exception Should Be here"));
+ return Signal.Throw(ex);
+ }).Finally(() => statusTrail.Add((position++, "Should always come here.")));
+
+ var result = false;
+ var cancel = fixture.Subscribe(_ => result = true);
+ await Task.Delay(500).ConfigureAwait(true);
+
+ Assert.Contains("started command", statusTrail.Select(x => x.Item2));
+
+ // Wait 11000 ms to allow execution complete
+ await Task.Delay(11000).ConfigureAwait(false);
+
+ Assert.DoesNotContain("starting cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.DoesNotContain("finished cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.Contains("finished command Normally", statusTrail.Select(x => x.Item2));
+ Assert.Equal("Should always come here.", statusTrail.Last().Item2);
+ Assert.True(result);
+ //// (0, "started command")
+ //// (2, "finished command Normally")
+ //// (1, "Should always come here.")
+ }
+
+ ///
+ /// Signals from task t handles user exceptions.
+ ///
+ /// A representing the asynchronous unit test.
+ [Fact]
+ public async Task SignalFromTask_T_HandlesUserExceptions()
+ {
+ var statusTrail = new List<(int, string)>();
+ var position = 0;
+ Exception? exception = null;
+ var fixture = Signal.FromTask(
+ async (cts) =>
+ {
+ statusTrail.Add((position++, "started command"));
+ await Task.Delay(10000, cts.Token).HandleCancellation(async () =>
+ {
+ // User Handles cancellation.
+ statusTrail.Add((position++, "starting cancelling command"));
+
+ // dummy cleanup
+ await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
+ statusTrail.Add((position++, "finished cancelling command"));
+ }).ConfigureAwait(true);
+
+ if (!cts.IsCancellationRequested)
+ {
+ statusTrail.Add((position++, "finished command Normally"));
+ }
+
+ throw new Exception("break execution");
+ }).Catch(
+ ex =>
+ {
+ exception = ex;
+ statusTrail.Add((position++, "Exception Should Be here"));
+ return Signal.Throw(ex);
+ }).Finally(() => statusTrail.Add((position++, "Should always come here.")));
+
+ var result = false;
+ var cancel = fixture.Subscribe(_ => result = true);
+ await Task.Delay(500).ConfigureAwait(true);
+
+ Assert.Contains("started command", statusTrail.Select(x => x.Item2));
+
+ await Task.Delay(10000).ConfigureAwait(true);
+ cancel.Dispose();
+
+ // Wait 6000 ms to allow execution and cleanup to complete
+ await Task.Delay(6000).ConfigureAwait(false);
+
+ Assert.DoesNotContain("starting cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.Contains("Should always come here.", statusTrail.Select(x => x.Item2));
+ Assert.DoesNotContain("finished cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.Contains("Exception Should Be here", statusTrail.Select(x => x.Item2));
+ Assert.Contains("finished command Normally", statusTrail.Select(x => x.Item2));
+ Assert.False(result);
+ //// (0, "started command")
+ //// (1, "finished command Normally")
+ //// (2, "Exception Should Be here")
+ //// (3, "Should always come here.")
+ }
+
+ ///
+ /// Signals from task t handles cancellation.
+ ///
+ /// A representing the asynchronous unit test.
+ [Fact]
+ public async Task SignalFromTask_T_HandlesCancellation()
+ {
+ var statusTrail = new List<(int, string)>();
+ var position = 0;
+ Exception? exception = null;
+ var fixture = Signal.FromTask(
+ async (cts) =>
+ {
+ statusTrail.Add((position++, "started command"));
+ await Task.Delay(10000, cts.Token).HandleCancellation(async () =>
+ {
+ // User Handles cancellation.
+ statusTrail.Add((position++, "starting cancelling command"));
+
+ // dummy cleanup
+ await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
+ statusTrail.Add((position++, "finished cancelling command"));
+ }).ConfigureAwait(true);
+
+ if (!cts.IsCancellationRequested)
+ {
+ statusTrail.Add((position++, "finished command Normally"));
+ }
+
+ return RxVoid.Default;
+ }).Catch(
+ ex =>
+ {
+ exception = ex;
+ statusTrail.Add((position++, "Exception Should Be here"));
+ return Signal.Throw(ex);
+ }).Finally(() => statusTrail.Add((position++, "Should always come here.")));
+
+ var result = false;
+ var cancel = fixture.Subscribe(_ => result = true);
+ await Task.Delay(500).ConfigureAwait(true);
+
+ Assert.Contains("started command", statusTrail.Select(x => x.Item2));
+ cancel.Dispose();
+
+ // Wait 6000 ms to allow execution and cleanup to complete
+ await Task.Delay(6000).ConfigureAwait(false);
+
+ Assert.Contains("starting cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.Contains("Should always come here.", statusTrail.Select(x => x.Item2));
+ Assert.Contains("finished cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.DoesNotContain("finished command Normally", statusTrail.Select(x => x.Item2));
+ Assert.False(result);
+ //// (0, "started command")
+ //// (1, "starting cancelling command")
+ //// (3, "Should always come here.")
+ //// (2, "finished cancelling command")
+ }
+
+ ///
+ /// Signals from task t handles token cancellation.
+ ///
+ /// A representing the asynchronous unit test.
+ [Fact]
+ public async Task SignalFromTask_T_HandlesTokenCancellation()
+ {
+ var statusTrail = new List<(int, string)>();
+ var position = 0;
+ Exception? exception = null;
+ var fixture = Signal.FromTask(
+ async (cts) =>
+ {
+ statusTrail.Add((position++, "started command"));
+ await Task.Delay(1000, cts.Token).HandleCancellation();
+ _ = Task.Run(async () =>
+ {
+ // Wait for 1s then cancel
+ await Task.Delay(1000);
+ cts.Cancel();
+ });
+ await Task.Delay(5000, cts.Token).HandleCancellation(async () =>
+ {
+ // User Handles cancellation.
+ statusTrail.Add((position++, "starting cancelling command"));
+
+ // dummy cleanup
+ await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
+ statusTrail.Add((position++, "finished cancelling command"));
+ }).ConfigureAwait(true);
+
+ if (!cts.IsCancellationRequested)
+ {
+ statusTrail.Add((position++, "finished command Normally"));
+ }
+
+ return RxVoid.Default;
+ }).Catch(
+ ex =>
+ {
+ exception = ex;
+ statusTrail.Add((position++, "Exception Should Be here"));
+ return Signal.Throw(ex);
+ }).Finally(() => statusTrail.Add((position++, "Should always come here.")));
+
+ var result = false;
+ var cancel = fixture.Subscribe(_ => result = true);
+ await Task.Delay(500).ConfigureAwait(true);
+
+ Assert.Contains("started command", statusTrail.Select(x => x.Item2));
+
+ // Wait 8000 ms to allow execution and cleanup to complete
+ await Task.Delay(8000).ConfigureAwait(false);
+
+ Assert.Contains("starting cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.Contains("Should always come here.", statusTrail.Select(x => x.Item2));
+ Assert.Contains("finished cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.DoesNotContain("finished command Normally", statusTrail.Select(x => x.Item2));
+ Assert.False(result);
+ //// (0, "started command")
+ //// (1, "starting cancelling command")
+ //// (2, "Should always come here.")
+ //// (3, "finished cancelling command")
+ }
+
+ ///
+ /// Signals from task t handles cancellation in base.
+ ///
+ /// A representing the asynchronous unit test.
+ [Fact]
+ public async Task SignalFromTask_T_HandlesCancellationInBase()
+ {
+ var statusTrail = new List<(int, string)>();
+ var position = 0;
+ Exception? exception = null;
+ var fixture = Signal.FromTask(
+ async (cts) =>
+ {
+ var ex = new Exception();
+ statusTrail.Add((position++, "started command"));
+ await Task.Delay(10000, cts.Token).ConfigureAwait(true);
+ if (!cts.IsCancellationRequested)
+ {
+ statusTrail.Add((position++, "finished command Normally"));
+ }
+
+ return RxVoid.Default;
+ }).Catch(
+ ex =>
+ {
+ exception = ex;
+ statusTrail.Add((position++, "Exception Should Be here"));
+ return Signal.Throw(ex);
+ }).Finally(() => statusTrail.Add((position++, "Should always come here.")));
+
+ var cancel = fixture.Subscribe();
+ await Task.Delay(500).ConfigureAwait(true);
+ Assert.Contains("started command", statusTrail.Select(x => x.Item2));
+ cancel.Dispose();
+
+ // Wait 5050 ms to allow execution and cleanup to complete
+ await Task.Delay(6000).ConfigureAwait(false);
+
+ Assert.DoesNotContain("finished command Normally", statusTrail.Select(x => x.Item2));
+ Assert.Equal("Should always come here.", statusTrail.Last().Item2);
+
+ //// (0, "started command")
+ //// (1, "Should always come here.")
+ }
+
+ ///
+ /// Signals from task t handles completion.
+ ///
+ /// A representing the asynchronous unit test.
+ [Fact]
+ public async Task SignalFromTask_T_HandlesCompletion()
+ {
+ var statusTrail = new List<(int, string)>();
+ var position = 0;
+ Exception? exception = null;
+ var fixture = Signal.FromTask(
+ async (cts) =>
+ {
+ statusTrail.Add((position++, "started command"));
+ await Task.Delay(10000, cts.Token).HandleCancellation(async () =>
+ {
+ // NOT EXPECTED TO ENTER HERE
+
+ // User Handles cancellation.
+ statusTrail.Add((position++, "starting cancelling command"));
+
+ // dummy cleanup
+ await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
+ statusTrail.Add((position++, "finished cancelling command"));
+ }).ConfigureAwait(true);
+
+ if (!cts.IsCancellationRequested)
+ {
+ statusTrail.Add((position++, "finished command Normally"));
+ }
+
+ return RxVoid.Default;
+ }).Catch(
+ ex =>
+ {
+ exception = ex;
+ statusTrail.Add((position++, "Exception Should Be here"));
+ return Signal.Throw(ex);
+ }).Finally(() => statusTrail.Add((position++, "Should always come here.")));
+
+ var result = false;
+ var cancel = fixture.Subscribe(_ => result = true);
+ await Task.Delay(500).ConfigureAwait(true);
+
+ Assert.Contains("started command", statusTrail.Select(x => x.Item2));
+
+ // Wait 11000 ms to allow execution complete
+ await Task.Delay(11000).ConfigureAwait(false);
+
+ Assert.DoesNotContain("starting cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.DoesNotContain("finished cancelling command", statusTrail.Select(x => x.Item2));
+ Assert.Contains("finished command Normally", statusTrail.Select(x => x.Item2));
+ Assert.Equal("Should always come here.", statusTrail.Last().Item2);
+ Assert.True(result);
+ //// (0, "started command")
+ //// (2, "finished command Normally")
+ //// (1, "Should always come here.")
+ }
+}
diff --git a/src/Minimalist.Reactive.Tests/SignalTests.cs b/src/Minimalist.Reactive.Tests/SignalTests.cs
index b0f176b..0b7aa37 100644
--- a/src/Minimalist.Reactive.Tests/SignalTests.cs
+++ b/src/Minimalist.Reactive.Tests/SignalTests.cs
@@ -1,369 +1,369 @@
-// Copyright (c) 2019-2022 ReactiveUI Association Incorporated. All rights reserved.
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
+using Minimalist.Reactive.Signals;
using Xunit;
-namespace Minimalist.Reactive.Tests
+namespace Minimalist.Reactive.Tests;
+
+///
+/// SubjectTests.
+///
+public class SignalTests
{
///
- /// SubjectTests.
+ /// Called when [next].
+ ///
+ [Fact]
+ public void OnNext()
+ {
+ var subject = new Signal();
+ var value = 0;
+
+ var subscription = subject.Subscribe(i => value += i);
+
+ subject.OnNext(1);
+ Assert.Equal(1, value);
+
+ subject.OnNext(1);
+ Assert.Equal(2, value);
+
+ subscription.Dispose();
+
+ subject.OnNext(1);
+ Assert.Equal(2, value);
+ }
+
+ ///
+ /// Called when [next disposed].
+ ///
+ [Fact]
+ public void OnNextDisposed()
+ {
+ var subject = new Signal();
+
+ subject.Dispose();
+
+ Assert.Throws(() => subject.OnNext(1));
+ }
+
+ ///
+ /// Called when [next disposed subscriber].
+ ///
+ [Fact]
+ public void OnNextDisposedSubscriber()
+ {
+ var subject = new Signal();
+ var value = 0;
+
+ subject.Subscribe(i => value += i).Dispose();
+
+ subject.OnNext(1);
+
+ Assert.Equal(0, value);
+ }
+
+ ///
+ /// Called when [completed].
+ ///
+ [Fact]
+ public void OnCompleted()
+ {
+ var subject = new Signal();
+ var completed = false;
+
+ var subscription = subject.Subscribe(_ => { }, () => completed = true);
+
+ subject.OnCompleted();
+
+ Assert.True(completed);
+ }
+
+ ///
+ /// Called when [completed no op].
+ ///
+ [Fact]
+ public void OnCompleted_NoErrors()
+ {
+ var subject = new Signal();
+
+ var subscription = subject.Subscribe(_ => { });
+
+ subject.OnCompleted();
+ }
+
+ ///
+ /// Called when [completed once].
+ ///
+ [Fact]
+ public void OnCompletedOnce()
+ {
+ var subject = new Signal();
+ var completed = 0;
+
+ var subscription = subject.Subscribe(_ => { }, () => completed++);
+
+ subject.OnCompleted();
+
+ Assert.Equal(1, completed);
+
+ subject.OnCompleted();
+
+ Assert.Equal(1, completed);
+ }
+
+ ///
+ /// Called when [completed disposed].
+ ///
+ [Fact]
+ public void OnCompletedDisposed()
+ {
+ var subject = new Signal();
+
+ subject.Dispose();
+
+ Assert.Throws(() => subject.OnCompleted());
+ }
+
+ ///
+ /// Called when [completed disposed subscriber].
+ ///
+ [Fact]
+ public void OnCompletedDisposedSubscriber()
+ {
+ var subject = new Signal();
+ var completed = false;
+
+ subject.Subscribe(_ => { }, () => completed = true).Dispose();
+
+ subject.OnCompleted();
+
+ Assert.False(completed);
+ }
+
+ ///
+ /// Called when [error].
///
- public class SignalTests
+ [Fact]
+ public void OnError()
{
- ///
- /// Called when [next].
- ///
- [Fact]
- public void OnNext()
- {
- var subject = new Signal();
- var value = 0;
+ var subject = new Signal();
+ var error = false;
- var subscription = subject.Subscribe(i => value += i);
+ var subscription = subject.Subscribe(_ => { }, _ => error = true);
- subject.OnNext(1);
- Assert.Equal(1, value);
+ subject.OnError(new Exception());
- subject.OnNext(1);
- Assert.Equal(2, value);
+ Assert.True(error);
+ }
- subscription.Dispose();
+ ///
+ /// Called when [error once].
+ ///
+ [Fact]
+ public void OnErrorOnce()
+ {
+ var subject = new Signal();
+ var errors = 0;
- subject.OnNext(1);
- Assert.Equal(2, value);
- }
+ var subscription = subject.Subscribe(_ => { }, _ => errors++);
- ///
- /// Called when [next disposed].
- ///
- [Fact]
- public void OnNextDisposed()
- {
- var subject = new Signal();
+ subject.OnError(new Exception());
- subject.Dispose();
+ Assert.Equal(1, errors);
- Assert.Throws(() => subject.OnNext(1));
- }
-
- ///
- /// Called when [next disposed subscriber].
- ///
- [Fact]
- public void OnNextDisposedSubscriber()
- {
- var subject = new Signal();
- var value = 0;
+ subject.OnError(new Exception());
- subject.Subscribe(i => value += i).Dispose();
+ Assert.Equal(1, errors);
+ }
- subject.OnNext(1);
+ ///
+ /// Called when [error disposed].
+ ///
+ [Fact]
+ public void OnErrorDisposed()
+ {
+ var subject = new Signal();
- Assert.Equal(0, value);
- }
-
- ///
- /// Called when [completed].
- ///
- [Fact]
- public void OnCompleted()
- {
- var subject = new Signal();
- var completed = false;
+ subject.Dispose();
- var subscription = subject.Subscribe(_ => { }, () => completed = true);
+ Assert.Throws(() => subject.OnError(new Exception()));
+ }
- subject.OnCompleted();
+ ///
+ /// Called when [error disposed subscriber].
+ ///
+ [Fact]
+ public void OnErrorDisposedSubscriber()
+ {
+ var subject = new Signal();
+ var error = false;
- Assert.True(completed);
- }
+ subject.Subscribe(_ => { }, _ => error = true).Dispose();
- ///
- /// Called when [completed no op].
- ///
- [Fact]
- public void OnCompleted_NoErrors()
- {
- var subject = new Signal();
+ subject.OnError(new Exception());
+
+ Assert.False(error);
+ }
+
+ ///
+ /// Called when [error rethrows by default].
+ ///
+ [Fact]
+ public void OnErrorRethrowsByDefault()
+ {
+ var subject = new Signal();
+
+ var subs = subject.Subscribe(_ => { });
+
+ Assert.Throws(() => subject.OnError(new ArgumentException()));
+ }
+
+ ///
+ /// Called when [error null throws].
+ ///
+ [Fact]
+ public void OnErrorNullThrows() =>
+ Assert.Throws(() => new Signal().OnError(null!));
+
+ ///
+ /// Subscribes the null throws.
+ ///
+ [Fact]
+ public void SubscribeNullThrows() =>
+ Assert.Throws(() => new Signal().Subscribe(null!));
- var subscription = subject.Subscribe(_ => { });
+ ///
+ /// Subscribes the disposed throws.
+ ///
+ [Fact]
+ public void SubscribeDisposedThrows()
+ {
+ var subject = new Signal();
- subject.OnCompleted();
- }
+ subject.Dispose();
- ///
- /// Called when [completed once].
- ///
- [Fact]
- public void OnCompletedOnce()
- {
- var subject = new Signal();
- var completed = 0;
+ Assert.Throws(() => subject.Subscribe(_ => { }));
+ }
- var subscription = subject.Subscribe(_ => { }, () => completed++);
+ ///
+ /// Subscribes the on completed.
+ ///
+ [Fact]
+ public void SubscribeOnCompleted()
+ {
+ var subject = new Signal();
+ subject.OnCompleted();
+ var completed = false;
- subject.OnCompleted();
+ subject.Subscribe(_ => { }, () => completed = true).Dispose();
- Assert.Equal(1, completed);
+ Assert.True(completed);
+ }
- subject.OnCompleted();
+ ///
+ /// Subscribes the on error.
+ ///
+ [Fact]
+ public void SubscribeOnError()
+ {
+ var subject = new Signal();
+ subject.OnError(new Exception());
+ var error = false;
- Assert.Equal(1, completed);
- }
+ subject.Subscribe(_ => { }, _ => error = true);
- ///
- /// Called when [completed disposed].
- ///
- [Fact]
- public void OnCompletedDisposed()
- {
- var subject = new Signal();
+ Assert.True(error);
+ }
- subject.Dispose();
+ ///
+ /// Subjects the where.
+ ///
+ [Fact]
+ public void SubjectWhere()
+ {
+ var subject = new Signal();
+ subject.Where(i => i % 2 == 0).Subscribe(i => Assert.Equal(2, i));
+ subject.OnNext(1);
+ subject.OnNext(2);
+ subject.OnNext(3);
+ subject.Dispose();
+ }
- Assert.Throws(() => subject.OnCompleted());
- }
+ ///
+ /// Subjects the select.
+ ///
+ [Fact]
+ public void SubjectSelect()
+ {
+ var subject = new Signal();
+ subject.Select(i => i * 2).Subscribe(i => Assert.Equal(4, i));
+ subject.OnNext(2);
+ subject.Dispose();
+ }
- ///
- /// Called when [completed disposed subscriber].
- ///
- [Fact]
- public void OnCompletedDisposedSubscriber()
- {
- var subject = new Signal();
- var completed = false;
+ ///
+ /// Subjects the buffer.
+ ///
+ [Fact]
+ public void SubjectBuffer()
+ {
+ var subject = new Signal();
+ var result = new List();
+ subject.Buffer(2).Subscribe(i => result = i.ToList());
+ subject.OnNext(1);
+ subject.OnNext(2);
+ Assert.Equal(new[] { 1, 2 }, result);
+ subject.OnNext(3);
+ subject.OnNext(4);
+ Assert.Equal(new[] { 3, 4 }, result);
+ subject.OnNext(5);
+ subject.OnNext(6);
+ Assert.Equal(new[] { 5, 6 }, result);
+ subject.Dispose();
+ }
- subject.Subscribe(_ => { }, () => completed = true).Dispose();
-
- subject.OnCompleted();
-
- Assert.False(completed);
- }
+ ///
+ /// Subjects the buffer skip2.
+ ///
+ [Fact]
+ public void SubjectBufferTake2Skip2()
+ {
+ var subject = new Signal();
+ var result = new List();
+ subject.Buffer(2, 2).Subscribe(i => result = i.ToList());
+ subject.OnNext(1);
+ subject.OnNext(2);
+ Assert.Equal(new[] { 1, 2 }, result);
+ subject.OnNext(3);
+ subject.OnNext(4);
+ Assert.Equal(new[] { 1, 2 }, result);
+ subject.OnNext(5);
+ subject.OnNext(6);
+ Assert.Equal(new[] { 5, 6 }, result);
+ subject.OnNext(7);
+ subject.OnNext(8);
+ Assert.Equal(new[] { 5, 6 }, result);
+ subject.Dispose();
+ }
- ///
- /// Called when [error].
- ///
- [Fact]
- public void OnError()
- {
- var subject = new Signal();
- var error = false;
-
- var subscription = subject.Subscribe(_ => { }, e => error = true);
-
- subject.OnError(new Exception());
-
- Assert.True(error);
- }
-
- ///
- /// Called when [error once].
- ///
- [Fact]
- public void OnErrorOnce()
- {
- var subject = new Signal();
- var errors = 0;
-
- var subscription = subject.Subscribe(_ => { }, e => errors++);
-
- subject.OnError(new Exception());
-
- Assert.Equal(1, errors);
-
- subject.OnError(new Exception());
-
- Assert.Equal(1, errors);
- }
-
- ///
- /// Called when [error disposed].
- ///
- [Fact]
- public void OnErrorDisposed()
- {
- var subject = new Signal();
-
- subject.Dispose();
-
- Assert.Throws(() => subject.OnError(new Exception()));
- }
-
- ///
- /// Called when [error disposed subscriber].
- ///
- [Fact]
- public void OnErrorDisposedSubscriber()
- {
- var subject = new Signal();
- var error = false;
-
- subject.Subscribe(_ => { }, e => error = true).Dispose();
-
- subject.OnError(new Exception());
-
- Assert.False(error);
- }
-
- ///
- /// Called when [error rethrows by default].
- ///
- [Fact]
- public void OnErrorRethrowsByDefault()
- {
- var subject = new Signal();
-
- var subs = subject.Subscribe(_ => { });
-
- Assert.Throws(() => subject.OnError(new ArgumentException()));
- }
-
- ///
- /// Called when [error null throws].
- ///
- [Fact]
- public void OnErrorNullThrows() =>
- Assert.Throws(() => new Signal().OnError(null!));
-
- ///
- /// Subscribes the null throws.
- ///
- [Fact]
- public void SubscribeNullThrows() =>
- Assert.Throws(() => new Signal().Subscribe(null!));
-
- ///
- /// Subscribes the disposed throws.
- ///
- [Fact]
- public void SubscribeDisposedThrows()
- {
- var subject = new Signal();
-
- subject.Dispose();
-
- Assert.Throws(() => subject.Subscribe(_ => { }));
- }
-
- ///
- /// Subscribes the on completed.
- ///
- [Fact]
- public void SubscribeOnCompleted()
- {
- var subject = new Signal();
- subject.OnCompleted();
- var completed = false;
-
- subject.Subscribe(_ => { }, () => completed = true).Dispose();
-
- Assert.True(completed);
- }
-
- ///
- /// Subscribes the on error.
- ///
- [Fact]
- public void SubscribeOnError()
- {
- var subject = new Signal();
- subject.OnError(new Exception());
- var error = false;
-
- subject.Subscribe(_ => { }, e => error = true);
-
- Assert.True(error);
- }
-
- ///
- /// Subjects the where.
- ///
- [Fact]
- public void SubjectWhere()
- {
- var subject = new Signal();
- subject.Where(i => i % 2 == 0).Subscribe(i => Assert.Equal(2, i));
- subject.OnNext(1);
- subject.OnNext(2);
- subject.OnNext(3);
- subject.Dispose();
- }
-
- ///
- /// Subjects the select.
- ///
- [Fact]
- public void SubjectSelect()
- {
- var subject = new Signal();
- subject.Select(i => i * 2).Subscribe(i => Assert.Equal(4, i));
- subject.OnNext(2);
- subject.Dispose();
- }
-
- ///
- /// Subjects the buffer.
- ///
- [Fact]
- public void SubjectBuffer()
- {
- var subject = new Signal();
- var result = new List();
- subject.Buffer(2).Subscribe(i => result = i.ToList());
- subject.OnNext(1);
- subject.OnNext(2);
- Assert.Equal(new[] { 1, 2 }, result);
- subject.OnNext(3);
- subject.OnNext(4);
- Assert.Equal(new[] { 3, 4 }, result);
- subject.OnNext(5);
- subject.OnNext(6);
- Assert.Equal(new[] { 5, 6 }, result);
- subject.Dispose();
- }
-
- ///
- /// Subjects the buffer skip2.
- ///
- [Fact]
- public void SubjectBufferTake2Skip2()
- {
- var subject = new Signal();
- var result = new List();
- subject.Buffer(2, 2).Subscribe(i => result = i.ToList());
- subject.OnNext(1);
- subject.OnNext(2);
- Assert.Equal(new[] { 1, 2 }, result);
- subject.OnNext(3);
- subject.OnNext(4);
- Assert.Equal(new[] { 1, 2 }, result);
- subject.OnNext(5);
- subject.OnNext(6);
- Assert.Equal(new[] { 5, 6 }, result);
- subject.OnNext(7);
- subject.OnNext(8);
- Assert.Equal(new[] { 5, 6 }, result);
- subject.Dispose();
- }
-
- ///
- /// Subjects the rx void.
- ///
- [Fact]
- public void SubjectRxVoid()
- {
- var subject = new Signal();
- var result = new List();
- subject.Subscribe(result.Add);
- subject.OnNext(RxVoid.Default);
- Assert.Equal(new[] { RxVoid.Default }, result);
- subject.OnNext(RxVoid.Default);
- Assert.Equal(new[] { RxVoid.Default, RxVoid.Default }, result);
- subject.Dispose();
- }
+ ///
+ /// Subjects the rx void.
+ ///
+ [Fact]
+ public void SubjectRxVoid()
+ {
+ var subject = new Signal();
+ var result = new List();
+ subject.Subscribe(result.Add);
+ subject.OnNext(RxVoid.Default);
+ Assert.Equal(new[] { RxVoid.Default }, result);
+ subject.OnNext(RxVoid.Default);
+ Assert.Equal(new[] { RxVoid.Default, RxVoid.Default }, result);
+ subject.Dispose();
}
}
diff --git a/src/Minimalist.Reactive.Tests/TestClasses/EmptyScheduler.cs b/src/Minimalist.Reactive.Tests/TestClasses/EmptyScheduler.cs
new file mode 100644
index 0000000..083314b
--- /dev/null
+++ b/src/Minimalist.Reactive.Tests/TestClasses/EmptyScheduler.cs
@@ -0,0 +1,23 @@
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+
+namespace Minimalist.Reactive.Concurrency;
+
+internal sealed class EmptyScheduler : IScheduler
+{
+ public static readonly EmptyScheduler Instance = new();
+
+ public DateTimeOffset Now => DateTimeOffset.MinValue;
+
+ public IDisposable Schedule(TState state, Func action) =>
+ throw new NotImplementedException();
+
+ public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) =>
+ throw new NotImplementedException();
+
+ public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) =>
+ throw new NotImplementedException();
+}
diff --git a/src/Minimalist.Reactive/BufferSignal{T,TResult}.cs b/src/Minimalist.Reactive/BufferSignal{T,TResult}.cs
deleted file mode 100644
index 48c10b7..0000000
--- a/src/Minimalist.Reactive/BufferSignal{T,TResult}.cs
+++ /dev/null
@@ -1,96 +0,0 @@
-// Copyright (c) 2019-2022 ReactiveUI Association Incorporated. All rights reserved.
-// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
-// See the LICENSE file in the project root for full license information.
-
-namespace Minimalist.Reactive
-{
- internal class BufferSignal : Signal>
- where TResult : IList?
- {
- private readonly int _skip;
- private readonly int _count;
- private IList? _buffer;
- private int _index;
- private IDisposable? _subscription;
-
- public BufferSignal(IObservable source, int count, int skip)
- {
- _skip = skip;
- _count = count;
- _subscription = source.Subscribe(
- next =>
- {
- if (IsDisposed)
- {
- return;
- }
-
- var idx = _index;
- var buffer = _buffer;
- if (idx == 0)
- {
- // Reset buffer.
- buffer = new List();
- _buffer = buffer;
- }
-
- // Take while not skipping
- if (idx >= 0)
- {
- buffer?.Add(next);
- }
-
- if (++idx == _count)
- {
- _buffer = null;
-
- // Set the skip.
- idx = 0 - _skip;
- OnNext(buffer!);
- }
-
- _index = idx;
- },
- (ex) =>
- {
- _buffer = null;
- OnError(ex);
- },
- () =>
- {
- var buffer = _buffer;
- _buffer = null;
-
- if (buffer != null)
- {
- OnNext(buffer);
- }
-
- OnCompleted();
- });
- }
-
- protected override void Dispose(bool disposing)
- {
- if (IsDisposed)
- {
- return;
- }
-
- Dispose(disposing);
- if (disposing)
- {
- var buffer = _buffer;
- _buffer = null;
-
- if (buffer != null)
- {
- OnNext(buffer);
- }
-
- _subscription?.Dispose();
- _subscription = null;
- }
- }
- }
-}
diff --git a/src/Minimalist.Reactive/Concurrency/CurrentThreadScheduler.cs b/src/Minimalist.Reactive/Concurrency/CurrentThreadScheduler.cs
new file mode 100644
index 0000000..2ba3b54
--- /dev/null
+++ b/src/Minimalist.Reactive/Concurrency/CurrentThreadScheduler.cs
@@ -0,0 +1,206 @@
+// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.ComponentModel;
+using System.Diagnostics;
+
+namespace Minimalist.Reactive.Concurrency;
+
+///
+/// CurrentThreadScheduler.
+///
+///
+public sealed class CurrentThreadScheduler : IScheduler
+{
+ private static readonly Lazy StaticInstance = new(() => new CurrentThreadScheduler());
+
+ [ThreadStatic]
+ private static bool _running;
+
+ [ThreadStatic]
+ private static SchedulerQueue? _threadLocalQueue;
+
+ [ThreadStatic]
+ private static Stopwatch? clock;
+
+ private CurrentThreadScheduler()
+ {
+ }
+
+ ///
+ /// Gets the singleton instance of the current thread scheduler.
+ ///
+ public static CurrentThreadScheduler Instance => StaticInstance.Value;
+
+ ///
+ /// Gets a value indicating whether gets a value that indicates whether the caller must call a Schedule method.
+ ///
+ [EditorBrowsable(EditorBrowsableState.Advanced)]
+#pragma warning disable CA1822 // Mark members as static
+ public bool IsScheduleRequired => !_running;
+#pragma warning restore CA1822 // Mark members as static
+
+ ///
+ /// Gets the scheduler's notion of current time.
+ ///
+ public DateTimeOffset Now => DateTimeOffset.UtcNow;
+
+ private static TimeSpan Time
+ {
+ get
+ {
+ clock ??= Stopwatch.StartNew();
+
+ return clock.Elapsed;
+ }
+ }
+
+ ///
+ /// Schedules an action to be executed.
+ ///
+ /// The type of the state passed to the scheduled action.
+ /// State passed to the action to be executed.
+ /// Action to be executed.
+ ///
+ /// The disposable object used to cancel the scheduled action (best effort).
+ ///
+ public IDisposable Schedule(TState state, Func action)
+ {
+ if (action == null)
+ {
+ throw new ArgumentNullException(nameof(action));
+ }
+
+ return action(this, state);
+ }
+
+ ///
+ /// Schedules an action to be executed after dueTime.
+ ///
+ /// The type of the state passed to the scheduled action.
+ /// State passed to the action to be executed.
+ /// Relative time after which to execute the action.
+ /// Action to be executed.
+ ///
+ /// The disposable object used to cancel the scheduled action (best effort).
+ ///
+ /// action.
+ /// is null.
+ public IDisposable Schedule(TState state, TimeSpan dueTime, Func action)
+ {
+ if (action == null)
+ {
+ throw new ArgumentNullException(nameof(action));
+ }
+
+ SchedulerQueue? queue;
+
+ // There is no timed task and no task is currently running
+ if (!_running)
+ {
+ _running = true;
+
+ if (dueTime > TimeSpan.Zero)
+ {
+ Thread.Sleep(dueTime);
+ }
+
+ // execute directly without queueing
+ IDisposable d;
+ try
+ {
+ d = action(this, state);
+ }
+ catch
+ {
+ SetQueue(null);
+ _running = false;
+ throw;
+ }
+
+ // did recursive tasks arrive?
+ queue = GetQueue();
+
+ // yes, run those in the queue as well
+ if (queue != null)
+ {
+ try
+ {
+ Trampoline.Run(queue);
+ }
+ finally
+ {
+ SetQueue(null);
+ _running = false;
+ }
+ }
+ else
+ {
+ _running = false;
+ }
+
+ return d;
+ }
+
+ queue = GetQueue();
+
+ // if there is a task running or there is a queue
+ if (queue == null)
+ {
+ queue = new SchedulerQueue(4);
+ SetQueue(queue);
+ }
+
+ var dt = Time + Scheduler.Normalize(dueTime);
+
+ // queue up more work
+ var si = new ScheduledItem