Skip to content

Commit

Permalink
Unified Enumerable TLS without TBB and for all host backends (#2048)
Browse files Browse the repository at this point in the history
This avoids the indirect windows.h inclusion with the tbb parallel backend which occurred with using tbb's enumerable_thread_specific class.

---------

Signed-off-by: Dan Hoeflinger <[email protected]>
  • Loading branch information
danhoeflinger authored Feb 5, 2025
1 parent 3fc6568 commit 4a9d3f4
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 99 deletions.
92 changes: 20 additions & 72 deletions include/oneapi/dpl/pstl/omp/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@
#ifndef _ONEDPL_INTERNAL_OMP_UTIL_H
#define _ONEDPL_INTERNAL_OMP_UTIL_H

#include <algorithm>
#include <atomic>
#include <iterator>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <memory>
#include <vector>
#include <type_traits>
#include <iterator> //std::iterator_traits, std::distance
#include <cstddef> //std::size_t
#include <memory> //std::allocator
#include <type_traits> // std::decay, is_integral_v, enable_if_t
#include <utility> // std::forward
#include <omp.h>
#include <tuple>

#include "../parallel_backend_utils.h"
#include "../unseq_backend_simd.h"
Expand Down Expand Up @@ -157,83 +152,36 @@ __process_chunk(const __chunk_metrics& __metrics, _Iterator __base, _Index __chu

namespace __detail
{

template <typename _ValueType, typename... _Args>
struct __enumerable_thread_local_storage
struct __get_num_threads
{
template <typename... _LocalArgs>
__enumerable_thread_local_storage(_LocalArgs&&... __args)
: __num_elements(0), __args(std::forward<_LocalArgs>(__args)...)
{
std::size_t __num_threads = omp_in_parallel() ? omp_get_num_threads() : omp_get_max_threads();
__thread_specific_storage.resize(__num_threads);
}

// Note: size should not be used concurrently with parallel loops which may instantiate storage objects, as it may
// not return an accurate count of instantiated storage objects in lockstep with the number allocated and stored.
// This is because the count is not atomic with the allocation and storage of the storage objects.
std::size_t
size() const
operator()() const
{
// only count storage which has been instantiated
return __num_elements.load();
}

// Note: get_with_id should not be used concurrently with parallel loops which may instantiate storage objects,
// as its operation may provide an out of date view of the stored objects based on the timing new object creation
// and incrementing of the size.
// TODO: Consider replacing this access with a visitor pattern.
_ValueType&
get_with_id(std::size_t __i)
{
assert(__i < size());

std::size_t __j = 0;

if (size() == __thread_specific_storage.size())
{
return *__thread_specific_storage[__i];
}

for (std::size_t __count = 0; __j < __thread_specific_storage.size() && __count <= __i; ++__j)
{
// Only include storage from threads which have instantiated a storage object
if (__thread_specific_storage[__j])
{
++__count;
}
}
// Need to back up one once we have found a valid storage object
return *__thread_specific_storage[__j - 1];
return omp_in_parallel() ? omp_get_num_threads() : omp_get_max_threads();
}
};

_ValueType&
get_for_current_thread()
struct __get_thread_num
{
std::size_t
operator()() const
{
std::size_t __i = omp_get_thread_num();
if (!__thread_specific_storage[__i])
{
// create temporary storage on first usage to avoid extra parallel region and unnecessary instantiation
__thread_specific_storage[__i] =
std::apply([](_Args... __arg_pack) { return std::make_unique<_ValueType>(__arg_pack...); }, __args);
__num_elements.fetch_add(1);
}
return *__thread_specific_storage[__i];
return omp_get_thread_num();
}

std::vector<std::unique_ptr<_ValueType>> __thread_specific_storage;
std::atomic_size_t __num_elements;
std::tuple<_Args...> __args;
};

} // namespace __detail

// enumerable thread local storage should only be created from make function
template <typename _ValueType, typename... Args>
__detail::__enumerable_thread_local_storage<_ValueType, Args...>
oneapi::dpl::__utils::__detail::__enumerable_thread_local_storage<
_ValueType, oneapi::dpl::__omp_backend::__detail::__get_num_threads,
oneapi::dpl::__omp_backend::__detail::__get_thread_num, Args...>
__make_enumerable_tls(Args&&... __args)
{
return __detail::__enumerable_thread_local_storage<_ValueType, Args...>(std::forward<Args>(__args)...);
return oneapi::dpl::__utils::__detail::__enumerable_thread_local_storage<
_ValueType, oneapi::dpl::__omp_backend::__detail::__get_num_threads,
oneapi::dpl::__omp_backend::__detail::__get_thread_num, Args...>(std::forward<Args>(__args)...);
}

} // namespace __omp_backend
Expand Down
43 changes: 16 additions & 27 deletions include/oneapi/dpl/pstl/parallel_backend_tbb.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include <tbb/parallel_invoke.h>
#include <tbb/task_arena.h>
#include <tbb/tbb_allocator.h>
#include <tbb/enumerable_thread_specific.h>
#if TBB_INTERFACE_VERSION > 12000
# include <tbb/task.h>
#endif
Expand Down Expand Up @@ -1309,45 +1308,35 @@ __parallel_for_each(oneapi::dpl::__internal::__tbb_backend_tag, _ExecutionPolicy

namespace __detail
{

template <typename _ValueType>
struct __enumerable_thread_local_storage
struct __get_num_threads
{
template <typename... _LocalArgs>
__enumerable_thread_local_storage(_LocalArgs&&... __args)
: __thread_specific_storage(std::forward<_LocalArgs>(__args)...)
{
}

std::size_t
size() const
{
return __thread_specific_storage.size();
}

_ValueType&
get_for_current_thread()
operator()() const
{
return __thread_specific_storage.local();
return tbb::this_task_arena::max_concurrency();
}
};

_ValueType&
get_with_id(std::size_t __i)
struct __get_thread_num
{
std::size_t
operator()() const
{
return __thread_specific_storage.begin()[__i];
return tbb::this_task_arena::current_thread_index();
}

tbb::enumerable_thread_specific<_ValueType> __thread_specific_storage;
};

} // namespace __detail
} //namespace __detail

// enumerable thread local storage should only be created from make function
template <typename _ValueType, typename... Args>
__detail::__enumerable_thread_local_storage<_ValueType>
oneapi::dpl::__utils::__detail::__enumerable_thread_local_storage<
_ValueType, oneapi::dpl::__tbb_backend::__detail::__get_num_threads,
oneapi::dpl::__tbb_backend::__detail::__get_thread_num, Args...>
__make_enumerable_tls(Args&&... __args)
{
return __detail::__enumerable_thread_local_storage<_ValueType>(std::forward<Args>(__args)...);
return oneapi::dpl::__utils::__detail::__enumerable_thread_local_storage<
_ValueType, oneapi::dpl::__tbb_backend::__detail::__get_num_threads,
oneapi::dpl::__tbb_backend::__detail::__get_thread_num, Args...>(std::forward<Args>(__args)...);
}

} // namespace __tbb_backend
Expand Down
76 changes: 76 additions & 0 deletions include/oneapi/dpl/pstl/parallel_backend_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@
#ifndef _ONEDPL_PARALLEL_BACKEND_UTILS_H
#define _ONEDPL_PARALLEL_BACKEND_UTILS_H

#include <atomic>
#include <cstddef>
#include <iterator>
#include <memory>
#include <tuple>
#include <utility>
#include <vector>
#include <cassert>
#include "utils.h"
#include "memory_fwd.h"
Expand Down Expand Up @@ -301,6 +306,77 @@ __set_symmetric_difference_construct(_ForwardIterator1 __first1, _ForwardIterato
return __cc_range(__first2, __last2, __result);
}

namespace __detail
{

template <typename _ValueType, typename _GetNumThreads, typename _GetThreadNum, typename... _Args>
struct __enumerable_thread_local_storage
{

template <typename... _LocalArgs>
__enumerable_thread_local_storage(_LocalArgs&&... __args)
: __thread_specific_storage(_GetNumThreads{}()), __num_elements(0), __args(std::forward<_LocalArgs>(__args)...)
{
}

// Note: size should not be used concurrently with parallel loops which may instantiate storage objects, as it may
// not return an accurate count of instantiated storage objects in lockstep with the number allocated and stored.
// This is because the count is not atomic with the allocation and storage of the storage objects.
std::size_t
size() const
{
// only count storage which has been instantiated
return __num_elements.load(std::memory_order_relaxed);
}

// Note: get_with_id should not be used concurrently with parallel loops which may instantiate storage objects,
// as its operation may provide an out of date view of the stored objects based on the timing new object creation
// and incrementing of the size.
// TODO: Consider replacing this access with a visitor pattern.
_ValueType&
get_with_id(std::size_t __i)
{
assert(__i < size());

if (size() == __thread_specific_storage.size())
{
return *__thread_specific_storage[__i];
}

std::size_t __j = 0;
for (std::size_t __count = 0; __j < __thread_specific_storage.size() && __count <= __i; ++__j)
{
// Only include storage from threads which have instantiated a storage object
if (__thread_specific_storage[__j])
{
++__count;
}
}
// Need to back up one once we have found a valid storage object
return *__thread_specific_storage[__j - 1];
}

_ValueType&
get_for_current_thread()
{
const std::size_t __i = _GetThreadNum{}();
std::unique_ptr<_ValueType>& __thread_local_storage = __thread_specific_storage[__i];
if (!__thread_local_storage)
{
// create temporary storage on first usage to avoid extra parallel region and unnecessary instantiation
__thread_local_storage =
std::apply([](_Args... __arg_pack) { return std::make_unique<_ValueType>(__arg_pack...); }, __args);
__num_elements.fetch_add(1, std::memory_order_relaxed);
}
return *__thread_local_storage;
}

std::vector<std::unique_ptr<_ValueType>> __thread_specific_storage;
std::atomic_size_t __num_elements;
const std::tuple<_Args...> __args;
};

} // namespace __detail
} // namespace __utils
} // namespace dpl
} // namespace oneapi
Expand Down

0 comments on commit 4a9d3f4

Please sign in to comment.