Skip to content

Commit 0deeb72

Browse files
committed
Always return outermost thread id
-flyby: deprecate get_outer_self_id -flyby: ignoring locks during termination detection Signed-off-by: Hartmut Kaiser <[email protected]>
1 parent b60bcf2 commit 0deeb72

File tree

36 files changed

+558
-408
lines changed

36 files changed

+558
-408
lines changed

components/iostreams/src/server/output_stream.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ namespace hpx::iostreams::detail {
3434
ar << valid;
3535
if (valid)
3636
{
37-
ar& data_;
37+
ar & data_;
3838
}
3939
}
4040

@@ -44,7 +44,7 @@ namespace hpx::iostreams::detail {
4444
ar >> valid;
4545
if (valid)
4646
{
47-
ar& data_;
47+
ar & data_;
4848
}
4949
}
5050
} // namespace hpx::iostreams::detail
@@ -89,10 +89,9 @@ namespace hpx::iostreams::server {
8989
{ // {{{
9090
// Perform the IO in another OS thread.
9191
detail::buffer in(buf_in);
92-
hpx::get_thread_pool("io_pool")->get_io_service().post(
93-
hpx::bind_front(&output_stream::call_write_sync, this, locality_id,
94-
count, std::ref(in),
95-
threads::thread_id_ref_type(threads::get_outer_self_id())));
92+
hpx::get_thread_pool("io_pool")->get_io_service().post(hpx::bind_front(
93+
&output_stream::call_write_sync, this, locality_id, count,
94+
std::ref(in), threads::thread_id_ref_type(threads::get_self_id())));
9695

9796
// Sleep until the worker thread wakes us up.
9897
this_thread::suspend(threads::thread_schedule_state::suspended,

examples/cancelable_action/cancelable_action/server/cancelable_action.hpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Hartmut Kaiser
1+
// Copyright (c) 2007-2024 Hartmut Kaiser
22
//
33
// SPDX-License-Identifier: BSL-1.0
44
// Distributed under the Boost Software License, Version 1.0. (See accompanying
@@ -63,6 +63,9 @@ namespace examples::server {
6363
}
6464
~reset_id()
6565
{
66+
auto const mtx = outer_.mtx_;
67+
std::lock_guard<hpx::mutex> l(*mtx);
68+
6669
[[maybe_unused]] hpx::thread::id const old_value = outer_.id_;
6770
outer_.id_ = hpx::thread::id();
6871
HPX_ASSERT(old_value != hpx::thread::id());
@@ -104,9 +107,15 @@ namespace examples::server {
104107
});
105108

106109
auto const mtx = mtx_;
107-
std::lock_guard<hpx::mutex> l(*mtx);
108-
HPX_ASSERT(id_ != hpx::thread::id());
109-
hpx::thread::interrupt(id_);
110+
111+
std::unique_lock<hpx::mutex> l(*mtx);
112+
auto const id = id_;
113+
114+
if (id != hpx::thread::id())
115+
{
116+
l.unlock();
117+
hpx::thread::interrupt(id);
118+
}
110119
}
111120

112121
HPX_DEFINE_COMPONENT_ACTION(cancelable_action, do_it, do_it_action)

libs/core/config/include/hpx/config/threads_stack.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
# endif
5656
# endif
5757
# endif
58-
58+
#
5959
# if HPX_SMALL_STACK_SIZE_TARGET < (2 * HPX_THREADS_STACK_OVERHEAD)
6060
# define HPX_SMALL_STACK_SIZE (2 * HPX_THREADS_STACK_OVERHEAD)
6161
# else

libs/core/coroutines/include/hpx/coroutines/thread_enums.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Hartmut Kaiser
1+
// Copyright (c) 2007-2024 Hartmut Kaiser
22
//
33
// SPDX-License-Identifier: BSL-1.0
44
// Distributed under the Boost Software License, Version 1.0. (See accompanying
@@ -425,6 +425,12 @@ namespace hpx::threads {
425425
runs_as_child_mode_bits = static_cast<std::uint8_t>(bits);
426426
}
427427

428+
void schedule_hint(std::int16_t core) noexcept
429+
{
430+
mode = thread_schedule_hint_mode::thread;
431+
hint = core;
432+
}
433+
428434
/// The hint associated with the mode. The interpretation of this hint
429435
/// depends on the given mode.
430436
std::int16_t hint = -1;

libs/core/execution/tests/unit/bulk_async.cpp

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// Copyright (c) 2015 Daniel Bourgeois
2+
// Copyright (c) 2024 Hartmut Kaiser
23
//
34
// SPDX-License-Identifier: BSL-1.0
45
// Distributed under the Boost Software License, Version 1.0. (See accompanying
@@ -16,16 +17,16 @@
1617
#include <vector>
1718

1819
////////////////////////////////////////////////////////////////////////////////
19-
int bulk_test(
20-
hpx::thread::id tid, int value, bool is_par, int passed_through) //-V813
20+
int bulk_test(hpx::thread::id const& tid, int value, bool is_par,
21+
int passed_through) //-V813
2122
{
2223
HPX_TEST_EQ(is_par, (tid != hpx::this_thread::get_id()));
2324
HPX_TEST_EQ(passed_through, 42);
2425
return value;
2526
}
2627

2728
template <typename Executor>
28-
void test_bulk_sync(Executor& exec)
29+
void test_bulk_sync(Executor&& exec)
2930
{
3031
hpx::thread::id tid = hpx::this_thread::get_id();
3132

@@ -35,14 +36,15 @@ void test_bulk_sync(Executor& exec)
3536
using hpx::placeholders::_1;
3637
using hpx::placeholders::_2;
3738

38-
std::vector<int> results = hpx::parallel::execution::bulk_sync_execute(
39-
exec, hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);
39+
std::vector<int> results =
40+
hpx::parallel::execution::bulk_sync_execute(HPX_FORWARD(Executor, exec),
41+
hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);
4042

4143
HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v)));
4244
}
4345

4446
template <typename Executor>
45-
void test_bulk_async(Executor& exec)
47+
void test_bulk_async(Executor&& exec)
4648
{
4749
hpx::thread::id tid = hpx::this_thread::get_id();
4850

@@ -54,31 +56,42 @@ void test_bulk_async(Executor& exec)
5456

5557
std::vector<hpx::future<int>> results =
5658
hpx::parallel::execution::bulk_async_execute(
57-
exec, hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);
59+
HPX_FORWARD(Executor, exec),
60+
hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);
5861

5962
HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v),
6063
[](hpx::future<int>& lhs, const int& rhs) {
6164
return lhs.get() == rhs;
6265
}));
6366
}
6467

68+
template <typename Executor>
69+
decltype(auto) disable_run_as_child(Executor&& exec)
70+
{
71+
auto hint = hpx::execution::experimental::get_hint(exec);
72+
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);
73+
74+
return hpx::experimental::prefer(hpx::execution::experimental::with_hint,
75+
HPX_FORWARD(Executor, exec), hint);
76+
}
77+
6578
////////////////////////////////////////////////////////////////////////////////
6679
int hpx_main()
6780
{
6881
hpx::execution::sequenced_executor seq_exec;
69-
test_bulk_sync(seq_exec);
82+
test_bulk_sync(disable_run_as_child(seq_exec));
7083

7184
hpx::execution::parallel_executor par_exec;
7285
hpx::execution::parallel_executor par_fork_exec(hpx::launch::fork);
73-
test_bulk_async(par_exec);
74-
test_bulk_async(par_fork_exec);
86+
test_bulk_async(disable_run_as_child(par_exec));
87+
test_bulk_async(disable_run_as_child(par_fork_exec));
7588

7689
return hpx::local::finalize();
7790
}
7891

7992
int main(int argc, char* argv[])
8093
{
81-
// By default this test should run on all available cores
94+
// By default, this test should run on all available cores
8295
std::vector<std::string> const cfg = {"hpx.os_threads=all"};
8396

8497
// Initialize and run HPX

libs/core/execution/tests/unit/minimal_async_executor.cpp

Lines changed: 55 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ void apply_test(hpx::latch& l, hpx::thread::id& id, int passed_through)
3636
l.count_down(1);
3737
}
3838

39-
void async_bulk_test(int, hpx::thread::id tid, int passed_through) //-V813
39+
void async_bulk_test(
40+
int, hpx::thread::id const& tid, int passed_through) //-V813
4041
{
4142
HPX_TEST_NEQ(tid, hpx::this_thread::get_id());
4243
HPX_TEST_EQ(passed_through, 42);
@@ -57,22 +58,22 @@ void test_apply(Executor& exec)
5758
}
5859

5960
template <typename Executor>
60-
void test_sync(Executor& exec)
61+
void test_sync(Executor&& exec)
6162
{
6263
HPX_TEST(hpx::parallel::execution::sync_execute(exec, &async_test, 42) !=
6364
hpx::this_thread::get_id());
6465
}
6566

6667
template <typename Executor>
67-
void test_async(Executor& exec)
68+
void test_async(Executor&& exec)
6869
{
6970
HPX_TEST(
7071
hpx::parallel::execution::async_execute(exec, &async_test, 42).get() !=
7172
hpx::this_thread::get_id());
7273
}
7374

7475
template <typename Executor>
75-
void test_bulk_sync(Executor& exec)
76+
void test_bulk_sync(Executor&& exec)
7677
{
7778
hpx::thread::id tid = hpx::this_thread::get_id();
7879

@@ -89,7 +90,7 @@ void test_bulk_sync(Executor& exec)
8990
}
9091

9192
template <typename Executor>
92-
void test_bulk_async(Executor& exec)
93+
void test_bulk_async(Executor&& exec)
9394
{
9495
hpx::thread::id tid = hpx::this_thread::get_id();
9596

@@ -153,18 +154,21 @@ struct test_async_executor1
153154
test_async_executor1 const&, F&& f, Ts&&... ts)
154155
{
155156
++count_async;
156-
return hpx::async(
157-
hpx::launch::async, std::forward<F>(f), std::forward<Ts>(ts)...);
157+
158+
auto policy = hpx::launch::async;
159+
auto hint = policy.hint();
160+
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);
161+
policy.set_hint(hint);
162+
163+
return hpx::async(policy, std::forward<F>(f), std::forward<Ts>(ts)...);
158164
}
159165
};
160166

161-
namespace hpx::execution::experimental {
162-
163-
template <>
164-
struct is_two_way_executor<test_async_executor1> : std::true_type
165-
{
166-
};
167-
} // namespace hpx::execution::experimental
167+
template <>
168+
struct hpx::execution::experimental::is_two_way_executor<test_async_executor1>
169+
: std::true_type
170+
{
171+
};
168172

169173
struct test_async_executor2 : test_async_executor1
170174
{
@@ -175,19 +179,22 @@ struct test_async_executor2 : test_async_executor1
175179
test_async_executor2 const&, F&& f, Ts&&... ts)
176180
{
177181
++count_sync;
178-
return hpx::async(
179-
hpx::launch::async, std::forward<F>(f), std::forward<Ts>(ts)...)
182+
183+
auto policy = hpx::launch::async;
184+
auto hint = policy.hint();
185+
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);
186+
policy.set_hint(hint);
187+
188+
return hpx::async(policy, std::forward<F>(f), std::forward<Ts>(ts)...)
180189
.get();
181190
}
182191
};
183192

184-
namespace hpx::execution::experimental {
185-
186-
template <>
187-
struct is_two_way_executor<test_async_executor2> : std::true_type
188-
{
189-
};
190-
} // namespace hpx::execution::experimental
193+
template <>
194+
struct hpx::execution::experimental::is_two_way_executor<test_async_executor2>
195+
: std::true_type
196+
{
197+
};
191198

192199
struct test_async_executor3 : test_async_executor1
193200
{
@@ -199,22 +206,26 @@ struct test_async_executor3 : test_async_executor1
199206
test_async_executor3 const&, F f, Shape const& shape, Ts&&... ts)
200207
{
201208
++count_bulk_sync;
209+
210+
auto policy = hpx::launch::async;
211+
auto hint = policy.hint();
212+
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);
213+
policy.set_hint(hint);
214+
202215
std::vector<hpx::future<void>> results;
203216
for (auto const& elem : shape)
204217
{
205-
results.push_back(hpx::async(hpx::launch::async, f, elem, ts...));
218+
results.push_back(hpx::async(policy, f, elem, ts...));
206219
}
207220
hpx::when_all(results).get();
208221
}
209222
};
210223

211-
namespace hpx::execution::experimental {
212-
213-
template <>
214-
struct is_two_way_executor<test_async_executor3> : std::true_type
215-
{
216-
};
217-
} // namespace hpx::execution::experimental
224+
template <>
225+
struct hpx::execution::experimental::is_two_way_executor<test_async_executor3>
226+
: std::true_type
227+
{
228+
};
218229

219230
struct test_async_executor4 : test_async_executor1
220231
{
@@ -226,10 +237,16 @@ struct test_async_executor4 : test_async_executor1
226237
test_async_executor4 const&, F f, Shape const& shape, Ts&&... ts)
227238
{
228239
++count_bulk_async;
240+
241+
auto policy = hpx::launch::async;
242+
auto hint = policy.hint();
243+
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);
244+
policy.set_hint(hint);
245+
229246
std::vector<hpx::future<void>> results;
230247
for (auto const& elem : shape)
231248
{
232-
results.push_back(hpx::async(hpx::launch::async, f, elem, ts...));
249+
results.push_back(hpx::async(policy, f, elem, ts...));
233250
}
234251
return results;
235252
}
@@ -261,13 +278,11 @@ struct test_async_executor5 : test_async_executor1
261278
}
262279
};
263280

264-
namespace hpx::execution::experimental {
265-
266-
template <>
267-
struct is_two_way_executor<test_async_executor5> : std::true_type
268-
{
269-
};
270-
} // namespace hpx::execution::experimental
281+
template <>
282+
struct hpx::execution::experimental::is_two_way_executor<test_async_executor5>
283+
: std::true_type
284+
{
285+
};
271286

272287
///////////////////////////////////////////////////////////////////////////////
273288
int hpx_main()
@@ -283,7 +298,7 @@ int hpx_main()
283298

284299
int main(int argc, char* argv[])
285300
{
286-
// By default this test should run on all available cores
301+
// By default, this test should run on all available cores
287302
std::vector<std::string> const cfg = {"hpx.os_threads=all"};
288303

289304
// Initialize and run HPX

0 commit comments

Comments
 (0)