-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlockless_queue.h
More file actions
136 lines (117 loc) · 3.2 KB
/
lockless_queue.h
File metadata and controls
136 lines (117 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#ifndef LOCKLESS_QUEUE
#define LOCKLESS_QUEUE
#include "access_guard.h"
#include <memory>//for the shared pointers
#include <condition_variable>
namespace threading
{
template<typename T>
class lockless_queue
{
private:
template<typename U>
class node
{
friend class lockless_queue;
node(const U data) :
data(new U(data)),
next(nullptr),
isHead(false)
{}
private:
node() :
data(nullptr),
next(nullptr),
isHead(true)
{}
public:
const bool isHead;
std::shared_ptr<U> data;
std::shared_ptr<node<U>> next;
};
public:
lockless_queue()
:
m_head(new node<T>()),
m_running(true),
m_accesGuard(true)
{}
~lockless_queue()
{
m_running = false;
m_newDataWaiter.notify_all();
m_accesGuard.close();
}
//adds a new element to the end of the array
void produce(const T &&data)
{
deletion_lock l_deletionLock(m_accesGuard);
if (!m_running)
return;
//the new node to be added at the end of the array
std::shared_ptr<node<T>> l_newNode(new node<T>(std::forward<const T&&>(data)));
//value to compare the next of the last node with
std::shared_ptr<node<T>> l_expectedNullPointer;
//pointer to the last node
std::shared_ptr<node<T>> l_lastNode;
std::shared_ptr<node<T>> l_nextLastNode = (std::atomic_load(&m_head));
do
{
l_lastNode = l_nextLastNode;
l_nextLastNode = std::atomic_load(&(l_lastNode->next));
} while (l_nextLastNode);
while (!std::atomic_compare_exchange_weak(&(l_lastNode->next), &l_expectedNullPointer, l_newNode))
{
l_lastNode = l_expectedNullPointer;
l_expectedNullPointer.reset();
}
//notify if this is the first node inserted into an empty queue
if (l_lastNode->isHead)
m_newDataWaiter.notify_one();
}
//Removes an element from the end of the array
std::shared_ptr<T> consume(bool blockingCall = false)
{
deletion_lock l_deletionLock(m_accesGuard);
if (!l_deletionLock || !m_running)
return nullptr;
std::shared_ptr<node<T>> l_head = std::atomic_load(&m_head);
//the pointer to the element we will consume
std::shared_ptr<node<T>> l_snack = std::atomic_load(&(l_head->next));
do
{
if (!l_snack)
{
if (blockingCall)
{
std::unique_lock<std::mutex> l_newDataWaiterLock(m_newDataWaiterMutex);
while (!l_snack)
{
if (!m_running)//break if the object was destroyed during the wait
return nullptr;
m_newDataWaiter.wait(l_newDataWaiterLock);//we block until
l_snack = std::atomic_load(&(l_head->next));
}// the load yields a head that is not null(to avoid unnecessary calls on spurious wake ups)
}
else//And this is not a blocking call we
return nullptr;//return null
}
}
/*Note that if the atomic CAS fails The new l_snack gets updated.
*/
while (!std::atomic_compare_exchange_weak(&(l_head->next), &l_snack, l_snack->next));
if (l_snack)
return std::shared_ptr<T>(l_snack->data);
else
return nullptr;
}
private:
//should be used as atomic
std::shared_ptr<node<T>> m_head;
std::mutex m_newDataWaiterMutex;
std::condition_variable m_newDataWaiter;
access_guard m_accesGuard;
bool m_running;
};
}
#endif // !LOCKLESS_QUEUE