Skip to content

Commit 8d9a4b9

Browse files
authoredJul 22, 2024··
Merge pull request #7 from idoleat/ch5-examples
Add new example for section 5
2 parents 8a7c0a2 + 11baa72 commit 8d9a4b9

File tree

6 files changed

+540
-121
lines changed

6 files changed

+540
-121
lines changed
 

‎concurrency-primer.tex

+196-121
Large diffs are not rendered by default.

‎examples/.clang-format

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
Language: Cpp
2+
3+
AccessModifierOffset: -4
4+
AlignAfterOpenBracket: Align
5+
AlignConsecutiveAssignments: false
6+
AlignConsecutiveDeclarations: false
7+
AlignOperands: true
8+
AlignTrailingComments: false
9+
AllowAllParametersOfDeclarationOnNextLine: false
10+
AllowShortBlocksOnASingleLine: false
11+
AllowShortCaseLabelsOnASingleLine: false
12+
AllowShortFunctionsOnASingleLine: None
13+
AllowShortIfStatementsOnASingleLine: false
14+
AllowShortLoopsOnASingleLine: false
15+
AlwaysBreakAfterDefinitionReturnType: None
16+
AlwaysBreakAfterReturnType: None
17+
AlwaysBreakBeforeMultilineStrings: false
18+
AlwaysBreakTemplateDeclarations: false
19+
BinPackArguments: true
20+
BinPackParameters: true
21+
22+
BraceWrapping:
23+
AfterClass: false
24+
AfterControlStatement: false
25+
AfterEnum: false
26+
AfterFunction: true
27+
AfterNamespace: true
28+
AfterObjCDeclaration: false
29+
AfterStruct: false
30+
AfterUnion: false
31+
AfterExternBlock: false
32+
BeforeCatch: false
33+
BeforeElse: false
34+
IndentBraces: false
35+
SplitEmptyFunction: true
36+
SplitEmptyRecord: true
37+
SplitEmptyNamespace: true
38+
39+
BreakBeforeBinaryOperators: None
40+
BreakBeforeBraces: Custom
41+
BreakBeforeInheritanceComma: false
42+
BreakBeforeTernaryOperators: false
43+
BreakConstructorInitializersBeforeComma: false
44+
BreakConstructorInitializers: BeforeComma
45+
BreakAfterJavaFieldAnnotations: false
46+
BreakStringLiterals: false
47+
ColumnLimit: 80
48+
CommentPragmas: '^ IWYU pragma:'
49+
CompactNamespaces: false
50+
ConstructorInitializerAllOnOneLineOrOnePerLine: false
51+
ConstructorInitializerIndentWidth: 4
52+
ContinuationIndentWidth: 4
53+
Cpp11BracedListStyle: false
54+
DerivePointerAlignment: false
55+
DisableFormat: false
56+
ExperimentalAutoDetectBinPacking: false
57+
FixNamespaceComments: false
58+
59+
ForEachMacros:
60+
- 'list_for_each'
61+
- 'list_for_each_safe'
62+
63+
IncludeBlocks: Preserve
64+
IncludeCategories:
65+
- Regex: '.*'
66+
Priority: 1
67+
IncludeIsMainRegex: '(Test)?$'
68+
IndentCaseLabels: false
69+
IndentPPDirectives: None
70+
IndentWidth: 4
71+
IndentWrappedFunctionNames: false
72+
KeepEmptyLinesAtTheStartOfBlocks: false
73+
MacroBlockBegin: ''
74+
MacroBlockEnd: ''
75+
MaxEmptyLinesToKeep: 1
76+
NamespaceIndentation: None
77+
78+
PointerAlignment: Right
79+
ReflowComments: false
80+
SortIncludes: false
81+
SortUsingDeclarations: false
82+
SpaceAfterCStyleCast: false
83+
SpaceAfterTemplateKeyword: true
84+
SpaceBeforeAssignmentOperators: true
85+
SpaceBeforeCtorInitializerColon: true
86+
SpaceBeforeInheritanceColon: true
87+
SpaceBeforeParens: ControlStatements
88+
SpaceBeforeRangeBasedForLoopColon: true
89+
SpaceInEmptyParentheses: false
90+
SpacesBeforeTrailingComments: 1
91+
SpacesInAngles: false
92+
SpacesInContainerLiterals: false
93+
SpacesInCStyleCastParentheses: false
94+
SpacesInParentheses: false
95+
SpacesInSquareBrackets: false
96+
Standard: Cpp03
97+
TabWidth: 4
98+
UseTab: Never

‎examples/Makefile

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
all:
2+
$(CC) -Wall -o rmw_example rmw_example.c -pthread -lm
3+
clean:
4+
rm -f rmw_example
5+
check: all
6+
./rmw_example

‎examples/rmw_example.c

+240
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
#include <stdio.h>
2+
#include <stdatomic.h>
3+
#include <threads.h>
4+
#include <stdlib.h>
5+
#include <stdbool.h>
6+
#include <assert.h>
7+
#include <math.h>
8+
9+
#define PRECISION 100 /* upper bound in BPP sum */
10+
#define CACHE_LINE_SIZE 64
11+
#define N_THREADS 64
12+
13+
struct tpool_future {
14+
void *result;
15+
void *arg;
16+
atomic_flag flag;
17+
};
18+
19+
typedef struct job {
20+
void *(*func)(void *);
21+
struct tpool_future *future;
22+
struct job *next, *prev;
23+
} job_t;
24+
25+
typedef struct idle_job {
26+
_Atomic(job_t *) prev;
27+
char padding[CACHE_LINE_SIZE -
28+
sizeof(_Atomic(job_t *))]; /* avoid false sharing */
29+
job_t job;
30+
} idle_job_t;
31+
32+
enum state { idle, running, cancelled };
33+
34+
typedef struct tpool {
35+
atomic_flag initialezed;
36+
int size;
37+
thrd_t *pool;
38+
atomic_int state;
39+
thrd_start_t func;
40+
idle_job_t *head; /* job queue is a SPMC ring buffer */
41+
} tpool_t;
42+
43+
static struct tpool_future *tpool_future_create(void *arg)
44+
{
45+
struct tpool_future *future = malloc(sizeof(struct tpool_future));
46+
if (future) {
47+
future->result = NULL;
48+
future->arg = arg;
49+
atomic_flag_clear(&future->flag);
50+
atomic_flag_test_and_set(&future->flag);
51+
}
52+
return future;
53+
}
54+
55+
void tpool_future_wait(struct tpool_future *future)
56+
{
57+
while (atomic_flag_test_and_set(&future->flag))
58+
;
59+
}
60+
61+
void tpool_future_destroy(struct tpool_future *future)
62+
{
63+
free(future->result);
64+
free(future);
65+
}
66+
67+
static int worker(void *args)
68+
{
69+
if (!args)
70+
return EXIT_FAILURE;
71+
tpool_t *thrd_pool = (tpool_t *)args;
72+
73+
while (1) {
74+
/* worker is laid off */
75+
if (atomic_load(&thrd_pool->state) == cancelled)
76+
return EXIT_SUCCESS;
77+
if (atomic_load(&thrd_pool->state) == running) {
78+
/* worker takes the job */
79+
job_t *job = atomic_load(&thrd_pool->head->prev);
80+
/* worker checks if there is only an idle job in the job queue */
81+
if (job == &thrd_pool->head->job) {
82+
/* worker says it is idle */
83+
atomic_store(&thrd_pool->state, idle);
84+
thrd_yield();
85+
continue;
86+
}
87+
while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job,
88+
job->prev))
89+
;
90+
job->future->result = (void *)job->func(job->future->arg);
91+
atomic_flag_clear(&job->future->flag);
92+
free(job);
93+
} else {
94+
/* worker is idle */
95+
thrd_yield();
96+
}
97+
};
98+
return EXIT_SUCCESS;
99+
}
100+
101+
static bool tpool_init(tpool_t *thrd_pool, size_t size)
102+
{
103+
if (atomic_flag_test_and_set(&thrd_pool->initialezed)) {
104+
printf("This thread pool has already been initialized.\n");
105+
return false;
106+
}
107+
108+
assert(size > 0);
109+
thrd_pool->pool = malloc(sizeof(thrd_t) * size);
110+
if (!thrd_pool->pool) {
111+
printf("Failed to allocate thread identifiers.\n");
112+
return false;
113+
}
114+
115+
idle_job_t *idle_job = malloc(sizeof(idle_job_t));
116+
if (!idle_job) {
117+
printf("Failed to allocate idle job.\n");
118+
return false;
119+
}
120+
121+
/* idle_job will always be the first job */
122+
idle_job->job.next = &idle_job->job;
123+
idle_job->job.prev = &idle_job->job;
124+
idle_job->prev = &idle_job->job;
125+
thrd_pool->func = worker;
126+
thrd_pool->head = idle_job;
127+
thrd_pool->state = idle;
128+
thrd_pool->size = size;
129+
130+
/* employer hires many workers */
131+
for (size_t i = 0; i < size; i++)
132+
thrd_create(thrd_pool->pool + i, worker, thrd_pool);
133+
134+
return true;
135+
}
136+
137+
static void tpool_destroy(tpool_t *thrd_pool)
138+
{
139+
if (atomic_exchange(&thrd_pool->state, cancelled))
140+
printf("Thread pool cancelled with jobs still running.\n");
141+
142+
for (int i = 0; i < thrd_pool->size; i++)
143+
thrd_join(thrd_pool->pool[i], NULL);
144+
145+
while (thrd_pool->head->prev != &thrd_pool->head->job) {
146+
job_t *job = thrd_pool->head->prev->prev;
147+
free(thrd_pool->head->prev);
148+
thrd_pool->head->prev = job;
149+
}
150+
free(thrd_pool->head);
151+
free(thrd_pool->pool);
152+
atomic_fetch_and(&thrd_pool->state, 0);
153+
atomic_flag_clear(&thrd_pool->initialezed);
154+
}
155+
156+
/* Use Bailey–Borwein–Plouffe formula to approximate PI */
157+
static void *bbp(void *arg)
158+
{
159+
int k = *(int *)arg;
160+
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
161+
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
162+
double *product = malloc(sizeof(double));
163+
if (!product)
164+
return NULL;
165+
166+
*product = 1 / pow(16, k) * sum;
167+
return (void *)product;
168+
}
169+
170+
struct tpool_future *add_job(tpool_t *thrd_pool, void *(*func)(void *),
171+
void *arg)
172+
{
173+
job_t *job = malloc(sizeof(job_t));
174+
if (!job)
175+
return NULL;
176+
177+
struct tpool_future *future = tpool_future_create(arg);
178+
if (!future) {
179+
free(job);
180+
return NULL;
181+
}
182+
183+
job->func = func;
184+
job->future = future;
185+
job->next = thrd_pool->head->job.next;
186+
job->prev = &thrd_pool->head->job;
187+
thrd_pool->head->job.next->prev = job;
188+
thrd_pool->head->job.next = job;
189+
if (thrd_pool->head->prev == &thrd_pool->head->job) {
190+
thrd_pool->head->prev = job;
191+
/* the previous job of the idle job is itself */
192+
thrd_pool->head->job.prev = &thrd_pool->head->job;
193+
}
194+
return future;
195+
}
196+
197+
static inline void wait_until(tpool_t *thrd_pool, int state)
198+
{
199+
while (atomic_load(&thrd_pool->state) != state)
200+
thrd_yield();
201+
}
202+
203+
int main()
204+
{
205+
int bbp_args[PRECISION];
206+
struct tpool_future *futures[PRECISION];
207+
double bbp_sum = 0;
208+
209+
tpool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT };
210+
if (!tpool_init(&thrd_pool, N_THREADS)) {
211+
printf("failed to init.\n");
212+
return 0;
213+
}
214+
/* employer ask workers to work */
215+
atomic_store(&thrd_pool.state, running);
216+
217+
/* employer wait ... until workers are idle */
218+
wait_until(&thrd_pool, idle);
219+
220+
/* employer add more job to the job queue */
221+
for (int i = 0; i < PRECISION; i++) {
222+
bbp_args[i] = i;
223+
futures[i] = add_job(&thrd_pool, bbp, &bbp_args[i]);
224+
}
225+
226+
/* employer ask workers to work */
227+
atomic_store(&thrd_pool.state, running);
228+
229+
/* employer wait for the result of job */
230+
for (int i = 0; i < PRECISION; i++) {
231+
tpool_future_wait(futures[i]);
232+
bbp_sum += *(double *)(futures[i]->result);
233+
tpool_future_destroy(futures[i]);
234+
}
235+
236+
/* employer destroys the job queue and lays workers off */
237+
tpool_destroy(&thrd_pool);
238+
printf("PI calculated with %d terms: %.15f\n", PRECISION, bbp_sum);
239+
return 0;
240+
}

‎images/atomic-rmw.pdf

9.49 KB
Binary file not shown.

‎images/atomic-types.pdf

12.2 KB
Binary file not shown.

0 commit comments

Comments
 (0)
Please sign in to comment.