diff --git a/AnthonyDemo/AnthonyDemo.csproj b/AnthonyDemo/AnthonyDemo.csproj new file mode 100644 index 0000000..fef1ae2 --- /dev/null +++ b/AnthonyDemo/AnthonyDemo.csproj @@ -0,0 +1,12 @@ + + + + Exe + netcoreapp2.2 + + + + + + + diff --git a/AnthonyDemo/AnthonyTaskRunner.cs b/AnthonyDemo/AnthonyTaskRunner.cs new file mode 100644 index 0000000..c75ddd2 --- /dev/null +++ b/AnthonyDemo/AnthonyTaskRunner.cs @@ -0,0 +1,46 @@ +using ParallelProcessPractice.Core; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; + +namespace AnthonyDemo +{ + public class AnthonyTaskRunner : TaskRunnerBase + { + private IWorker worker1; + private IWorker worker2; + private IWorker worker3; + + private void InitWorker(IEnumerable tasks) + { + int taskCount = tasks.Count(); + + worker1 = new Worker(1, new ConcurrentQueue(tasks), taskCount); + worker2 = new Worker(2, new ConcurrentQueue(), taskCount); + worker3 = new Worker(3, new ConcurrentQueue(), taskCount); + + worker1.Next = worker2; + worker2.Next = worker3; + + worker1.Receive(5); + worker2.Receive(3); + worker3.Receive(3); + } + + private void WaitAllTaskFinished() + { + while (worker1.IsFinished == false || + worker2.IsFinished == false || + worker3.IsFinished == false) + { + ; + } + } + + public override void Run(IEnumerable tasks) + { + InitWorker(tasks); + WaitAllTaskFinished(); + } + } +} \ No newline at end of file diff --git a/AnthonyDemo/IWorker.cs b/AnthonyDemo/IWorker.cs new file mode 100644 index 0000000..d436301 --- /dev/null +++ b/AnthonyDemo/IWorker.cs @@ -0,0 +1,45 @@ +using ParallelProcessPractice.Core; +using System.Collections.Concurrent; + +namespace AnthonyDemo +{ + public interface IWorker + { + /// + /// 佇列來源 + /// + ConcurrentQueue Queue { get; set; } + + /// + /// 預計作業數 + /// + int ExpectCount { get; set; } + + /// + /// 已執行作業數 + /// + int FinishedCount { get; set; } + + /// + /// 是否完成 + /// + bool IsFinished { get; } + + /// + /// 階段序號 + /// + int Step { get; set; } + + /// + /// 開始接收佇列 + /// + /// + /// + void Receive(int startCount); + + /// + /// 下一個關聯worker + /// + IWorker Next { get; set; } + } +} \ No newline at end of file diff --git a/AnthonyDemo/Program.cs b/AnthonyDemo/Program.cs new file mode 100644 index 0000000..3fc26c5 --- /dev/null +++ b/AnthonyDemo/Program.cs @@ -0,0 +1,13 @@ +using System; + +namespace AnthonyDemo +{ + internal class Program + { + private static void Main(string[] args) + { + AnthonyTaskRunner run = new AnthonyTaskRunner(); + run.ExecuteTasks(1000); + } + } +} \ No newline at end of file diff --git a/AnthonyDemo/Worker.cs b/AnthonyDemo/Worker.cs new file mode 100644 index 0000000..202b5ae --- /dev/null +++ b/AnthonyDemo/Worker.cs @@ -0,0 +1,58 @@ +using ParallelProcessPractice.Core; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace AnthonyDemo +{ + public class Worker : IWorker + { + public IWorker Next { get; set; } + + public ConcurrentQueue Queue { get; set; } + + public int Step { get; set; } + + public int ExpectCount { get; set; } + + public int FinishedCount { get; set; } + + public bool IsFinished { get => ExpectCount == FinishedCount; } + + private object _lock = new object(); + + public Worker(int step, ConcurrentQueue queue, int expectCount) + { + this.Queue = queue; + this.Step = step; + this.ExpectCount = expectCount; + } + + public void Receive(int startCount) + { + for (int index = 1; index <= startCount; index++) + { + Task.Run(() => + { + while (true) + { + if (Queue.TryDequeue(out MyTask item)) + { + item.DoStepN(Step); + + lock (_lock) + { + FinishedCount++; + } + + if (Next != null) + { + Next.Queue.Enqueue(item); + } + } + } + }); + } + } + } +} \ No newline at end of file diff --git a/Benchmark/Benchmark.csproj b/Benchmark/Benchmark.csproj index ea0e9b3..24a58f7 100644 --- a/Benchmark/Benchmark.csproj +++ b/Benchmark/Benchmark.csproj @@ -12,6 +12,7 @@ + diff --git a/ParallelProcessPractice.sln b/ParallelProcessPractice.sln index 387a9fb..77a4183 100644 --- a/ParallelProcessPractice.sln +++ b/ParallelProcessPractice.sln @@ -47,6 +47,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NathanDemo", "NathanDemo\Na EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JolinDemo", "JolinDemo\JolinDemo.csproj", "{C58EA4C0-FFEC-43D6-97ED-EF755AE14B64}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AnthonyDemo", "AnthonyDemo\AnthonyDemo.csproj", "{7B76A23F-BD8D-41C7-8207-9EB72BC54898}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -117,6 +119,10 @@ Global {C58EA4C0-FFEC-43D6-97ED-EF755AE14B64}.Debug|Any CPU.Build.0 = Debug|Any CPU {C58EA4C0-FFEC-43D6-97ED-EF755AE14B64}.Release|Any CPU.ActiveCfg = Release|Any CPU {C58EA4C0-FFEC-43D6-97ED-EF755AE14B64}.Release|Any CPU.Build.0 = Release|Any CPU + {7B76A23F-BD8D-41C7-8207-9EB72BC54898}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7B76A23F-BD8D-41C7-8207-9EB72BC54898}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7B76A23F-BD8D-41C7-8207-9EB72BC54898}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7B76A23F-BD8D-41C7-8207-9EB72BC54898}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -136,6 +142,7 @@ Global {1B34613F-9384-4365-A54E-D9719D6E902B} = {B21D6D24-8EC2-497F-AE16-E0155FEE28CE} {CD021A28-2C35-4FAA-BFDC-3E4543F009A0} = {B21D6D24-8EC2-497F-AE16-E0155FEE28CE} {C58EA4C0-FFEC-43D6-97ED-EF755AE14B64} = {B21D6D24-8EC2-497F-AE16-E0155FEE28CE} + {7B76A23F-BD8D-41C7-8207-9EB72BC54898} = {B21D6D24-8EC2-497F-AE16-E0155FEE28CE} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {15051360-3A56-4052-A944-97C62F90EEC6}