1
1
using System ;
2
- using System . Collections . Generic ;
3
- using System . Runtime . CompilerServices ;
2
+ using System . Collections . Concurrent ;
3
+ using System . Threading ;
4
4
using System . Threading . Tasks ;
5
5
using DotnetSpider . Http ;
6
6
using HWT ;
@@ -9,66 +9,55 @@ namespace DotnetSpider.Infrastructure
9
9
{
10
10
public class RequestedQueue : IDisposable
11
11
{
12
- private readonly Dictionary < string , Request > _dict ;
13
-
12
+ private readonly ConcurrentDictionary < string , Request > _dict ;
14
13
private readonly HashedWheelTimer _timer ;
15
-
16
- private readonly List < Request > _queue ;
14
+ private ConcurrentBag < Request > _queue ;
17
15
18
16
public RequestedQueue ( )
19
17
{
20
- _dict = new Dictionary < string , Request > ( ) ;
21
- _queue = new List < Request > ( ) ;
18
+ _dict = new ConcurrentDictionary < string , Request > ( ) ;
19
+ _queue = new ConcurrentBag < Request > ( ) ;
22
20
_timer = new HashedWheelTimer ( TimeSpan . FromSeconds ( 1 )
23
- , ticksPerWheel : 100000
24
- , maxPendingTimeouts : 0 ) ;
21
+ , 100000 ) ;
25
22
}
26
23
27
24
public int Count => _dict . Count ;
28
25
29
- [ MethodImpl ( MethodImplOptions . Synchronized ) ]
30
26
public bool Enqueue ( Request request )
31
27
{
32
28
if ( request . Timeout <= 2000 )
33
29
{
34
30
throw new SpiderException ( "Timeout should not less than 2000 milliseconds" ) ;
35
31
}
36
32
37
- if ( ! _dict . ContainsKey ( request . Hash ) )
33
+ if ( ! _dict . TryAdd ( request . Hash , request ) )
38
34
{
39
- _dict . Add ( request . Hash , request ) ;
40
- _timer . NewTimeout ( new TimeoutTask ( this , request . Hash ) ,
41
- TimeSpan . FromMilliseconds ( request . Timeout ) ) ;
42
- return true ;
35
+ return false ;
43
36
}
44
37
45
- return false ;
38
+ _timer . NewTimeout ( new TimeoutTask ( this , request . Hash ) ,
39
+ TimeSpan . FromMilliseconds ( request . Timeout ) ) ;
40
+ return true ;
46
41
}
47
42
48
- [ MethodImpl ( MethodImplOptions . Synchronized ) ]
43
+
49
44
public Request Dequeue ( string hash )
50
45
{
51
- var request = _dict [ hash ] ;
52
- _dict . Remove ( hash ) ;
53
- return request ;
46
+ return _dict . TryRemove ( hash , out var request ) ? request : null ;
54
47
}
55
48
56
- [ MethodImpl ( MethodImplOptions . Synchronized ) ]
57
49
public Request [ ] GetAllTimeoutList ( )
58
50
{
59
51
var data = _queue . ToArray ( ) ;
60
- _queue . Clear ( ) ;
52
+ Interlocked . Exchange ( ref _queue , new ConcurrentBag < Request > ( ) ) ;
61
53
return data ;
62
54
}
63
55
64
- [ MethodImpl ( MethodImplOptions . Synchronized ) ]
65
56
private void Timeout ( string hash )
66
57
{
67
- if ( _dict . ContainsKey ( hash ) )
58
+ if ( _dict . TryRemove ( hash , out var request ) )
68
59
{
69
- var request = _dict [ hash ] ;
70
60
_queue . Add ( request ) ;
71
- _dict . Remove ( hash ) ;
72
61
}
73
62
}
74
63
@@ -93,7 +82,7 @@ public Task RunAsync(ITimeout timeout)
93
82
public void Dispose ( )
94
83
{
95
84
_dict . Clear ( ) ;
96
- _queue . Clear ( ) ;
85
+ Interlocked . Exchange ( ref _queue , new ConcurrentBag < Request > ( ) ) ;
97
86
_timer . Dispose ( ) ;
98
87
}
99
88
}
0 commit comments