Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions thread_pthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -1401,7 +1401,7 @@ void
rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr)
{
VM_ASSERT(cr == GET_RACTOR());
VM_ASSERT(vm->ractor.sync.lock_owner == cr); // VM is locked
VM_ASSERT(rb_fiber_threadptr(vm->ractor.sync.lock_owner_fiber)->ractor == cr); // VM is locked
VM_ASSERT(!vm->ractor.sched.barrier_waiting);
VM_ASSERT(vm->ractor.sched.barrier_waiting_cnt == 0);
VM_ASSERT(vm->ractor.sched.barrier_ractor == NULL);
Expand All @@ -1419,8 +1419,9 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr)

// release VM lock
lock_rec = vm->ractor.sync.lock_rec;
rb_fiber_t *fiber = vm->ractor.sync.lock_owner_fiber;
vm->ractor.sync.lock_rec = 0;
vm->ractor.sync.lock_owner = NULL;
vm->ractor.sync.lock_owner_fiber = NULL;
rb_native_mutex_unlock(&vm->ractor.sync.lock);

// interrupts all running threads
Expand Down Expand Up @@ -1449,7 +1450,7 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr)
// acquire VM lock
rb_native_mutex_lock(&vm->ractor.sync.lock);
vm->ractor.sync.lock_rec = lock_rec;
vm->ractor.sync.lock_owner = cr;
vm->ractor.sync.lock_owner_fiber = fiber;
}

// do not release ractor_sched_lock and threre is no newly added (resumed) thread
Expand Down Expand Up @@ -1500,11 +1501,10 @@ ractor_sched_barrier_join_wait_locked(rb_vm_t *vm, rb_thread_t *th)
}

void
rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr)
rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr, rb_fiber_t *fiber)
{
VM_ASSERT(cr->threads.sched.running != NULL); // running ractor
VM_ASSERT(cr == GET_RACTOR());
VM_ASSERT(vm->ractor.sync.lock_owner == NULL); // VM is locked, but owner == NULL
VM_ASSERT(vm->ractor.sched.barrier_waiting); // VM needs barrier sync

#if USE_RUBY_DEBUG_LOG || VM_CHECK_MODE > 0
Expand Down Expand Up @@ -1583,10 +1583,15 @@ thread_sched_atfork(struct rb_thread_sched *sched)
}
vm->ractor.sched.running_cnt = 0;

vm->ractor.sync.lock_rec = 0;
th->ec->tag->lock_rec = 0;
vm->ractor.sync.lock_owner_fiber = NULL;
rb_native_mutex_initialize(&vm->ractor.sync.lock);

rb_native_mutex_initialize(&vm->ractor.sched.lock);
#if VM_CHECK_MODE > 0
vm->ractor.sched.lock_owner = NULL;
vm->ractor.sched.locked = false;
vm->ractor.sched.lock_owner = NULL;
#endif

// rb_native_cond_destroy(&vm->ractor.sched.cond);
Expand Down
2 changes: 0 additions & 2 deletions thread_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ rb_mutex_num_waiting(rb_mutex_t *mutex)
return n;
}

rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);

static void
mutex_free(void *ptr)
{
Expand Down
6 changes: 3 additions & 3 deletions thread_win32.c
Original file line number Diff line number Diff line change
Expand Up @@ -968,9 +968,9 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr)
}

void
rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr)
rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr, rb_fiber_t *fiber)
{
vm->ractor.sync.lock_owner = cr;
vm->ractor.sync.lock_owner_fiber = fiber;
unsigned int barrier_cnt = vm->ractor.sync.barrier_cnt;
rb_thread_t *th = GET_THREAD();
bool running;
Expand Down Expand Up @@ -1005,7 +1005,7 @@ rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr)
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
}

vm->ractor.sync.lock_owner = NULL;
vm->ractor.sync.lock_owner_fiber = NULL;
}

bool
Expand Down
11 changes: 6 additions & 5 deletions vm_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ typedef struct rb_vm_struct {
struct {
// monitor
rb_nativethread_lock_t lock;
struct rb_ractor_struct *lock_owner;
struct rb_fiber_struct *lock_owner_fiber;
unsigned int lock_rec;

// join at exit
Expand Down Expand Up @@ -2073,18 +2073,18 @@ void rb_ec_vm_lock_rec_release(const rb_execution_context_t *ec,
/* This technically is a data race, as it's checked without the lock, however we
* check against a value only our own thread will write. */
NO_SANITIZE("thread", static inline bool
vm_locked_by_ractor_p(rb_vm_t *vm, rb_ractor_t *cr))
vm_locked_by_fiber_p(rb_vm_t *vm, rb_fiber_t *cur_f))
{
VM_ASSERT(cr == GET_RACTOR());
return vm->ractor.sync.lock_owner == cr;
VM_ASSERT(cur_f == GET_THREAD()->ec->fiber_ptr);
return vm->ractor.sync.lock_owner_fiber == cur_f;
}

static inline unsigned int
rb_ec_vm_lock_rec(const rb_execution_context_t *ec)
{
rb_vm_t *vm = rb_ec_vm_ptr(ec);

if (!vm_locked_by_ractor_p(vm, rb_ec_ractor_ptr(ec))) {
if (!vm_locked_by_fiber_p(vm, ec->fiber_ptr)) {
return 0;
}
else {
Expand Down Expand Up @@ -2150,6 +2150,7 @@ void rb_execution_context_mark(const rb_execution_context_t *ec);
void rb_fiber_close(rb_fiber_t *fib);
void Init_native_thread(rb_thread_t *th);
int rb_vm_check_ints_blocking(rb_execution_context_t *ec);
rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);

// vm_sync.h
void rb_vm_cond_wait(rb_vm_t *vm, rb_nativethread_cond_t *cond);
Expand Down
46 changes: 22 additions & 24 deletions vm_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
#include "vm_debug.h"

void rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr);
void rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr);
void rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr, rb_fiber_t *fiber);
void rb_ractor_sched_barrier_end(rb_vm_t *vm, rb_ractor_t *cr);

static bool
vm_locked(rb_vm_t *vm)
{
return vm_locked_by_ractor_p(vm, GET_RACTOR());
return vm_locked_by_fiber_p(vm, GET_THREAD()->ec->fiber_ptr);
}

#if RUBY_DEBUG > 0
Expand Down Expand Up @@ -62,7 +62,7 @@ vm_need_barrier(bool no_barrier, const rb_ractor_t *cr, const rb_vm_t *vm)
}

static void
vm_lock_enter(rb_ractor_t *cr, rb_vm_t *vm, bool locked, bool no_barrier, unsigned int *lev APPEND_LOCATION_ARGS)
vm_lock_enter(rb_ractor_t *cr, rb_fiber_t *fiber, rb_vm_t *vm, bool locked, bool no_barrier, unsigned int *lev APPEND_LOCATION_ARGS)
{
RUBY_DEBUG_LOG2(file, line, "start locked:%d", locked);

Expand All @@ -76,7 +76,7 @@ vm_lock_enter(rb_ractor_t *cr, rb_vm_t *vm, bool locked, bool no_barrier, unsign
#endif
// lock
rb_native_mutex_lock(&vm->ractor.sync.lock);
VM_ASSERT(vm->ractor.sync.lock_owner == NULL);
VM_ASSERT(vm->ractor.sync.lock_owner_fiber == NULL);
VM_ASSERT(vm->ractor.sync.lock_rec == 0);

// barrier
Expand All @@ -87,26 +87,26 @@ vm_lock_enter(rb_ractor_t *cr, rb_vm_t *vm, bool locked, bool no_barrier, unsign
do {
VM_ASSERT(vm_need_barrier_waiting(vm));
RUBY_DEBUG_LOG("barrier serial:%u", vm->ractor.sched.barrier_serial);
rb_ractor_sched_barrier_join(vm, cr);
rb_ractor_sched_barrier_join(vm, cr, fiber);
} while (vm_need_barrier_waiting(vm));
}

VM_ASSERT(vm->ractor.sync.lock_rec == 0);
VM_ASSERT(vm->ractor.sync.lock_owner == NULL);
vm->ractor.sync.lock_owner = cr;
VM_ASSERT(vm->ractor.sync.lock_owner_fiber == NULL);
vm->ractor.sync.lock_owner_fiber = fiber;
}

vm->ractor.sync.lock_rec++;
*lev = vm->ractor.sync.lock_rec;

RUBY_DEBUG_LOG2(file, line, "rec:%u owner:%u", vm->ractor.sync.lock_rec,
(unsigned int)rb_ractor_id(vm->ractor.sync.lock_owner));
(unsigned int)rb_ractor_id(cr));
}

static void
vm_lock_leave(rb_vm_t *vm, bool no_barrier, unsigned int *lev APPEND_LOCATION_ARGS)
{
MAYBE_UNUSED(rb_ractor_t *cr = vm->ractor.sync.lock_owner);
MAYBE_UNUSED(rb_ractor_t *cr = rb_fiber_threadptr(vm->ractor.sync.lock_owner_fiber)->ractor);

RUBY_DEBUG_LOG2(file, line, "rec:%u owner:%u%s", vm->ractor.sync.lock_rec,
(unsigned int)rb_ractor_id(cr),
Expand All @@ -129,7 +129,7 @@ vm_lock_leave(rb_vm_t *vm, bool no_barrier, unsigned int *lev APPEND_LOCATION_AR
*lev = vm->ractor.sync.lock_rec;

if (vm->ractor.sync.lock_rec == 0) {
vm->ractor.sync.lock_owner = NULL;
vm->ractor.sync.lock_owner_fiber = NULL;
rb_native_mutex_unlock(&vm->ractor.sync.lock);
}
}
Expand All @@ -139,10 +139,10 @@ rb_vm_lock_enter_body(unsigned int *lev APPEND_LOCATION_ARGS)
{
rb_vm_t *vm = GET_VM();
if (vm_locked(vm)) {
vm_lock_enter(NULL, vm, true, false, lev APPEND_LOCATION_PARAMS);
vm_lock_enter(NULL, NULL, vm, true, false, lev APPEND_LOCATION_PARAMS);
}
else {
vm_lock_enter(GET_RACTOR(), vm, false, false, lev APPEND_LOCATION_PARAMS);
vm_lock_enter(GET_RACTOR(), GET_THREAD()->ec->fiber_ptr, vm, false, false, lev APPEND_LOCATION_PARAMS);
}
}

Expand All @@ -151,18 +151,18 @@ rb_vm_lock_enter_body_nb(unsigned int *lev APPEND_LOCATION_ARGS)
{
rb_vm_t *vm = GET_VM();
if (vm_locked(vm)) {
vm_lock_enter(NULL, vm, true, true, lev APPEND_LOCATION_PARAMS);
vm_lock_enter(NULL, NULL, vm, true, true, lev APPEND_LOCATION_PARAMS);
}
else {
vm_lock_enter(GET_RACTOR(), vm, false, true, lev APPEND_LOCATION_PARAMS);
vm_lock_enter(GET_RACTOR(), GET_THREAD()->ec->fiber_ptr, vm, false, true, lev APPEND_LOCATION_PARAMS);
}
}

void
rb_vm_lock_enter_body_cr(rb_ractor_t *cr, unsigned int *lev APPEND_LOCATION_ARGS)
{
rb_vm_t *vm = GET_VM();
vm_lock_enter(cr, vm, vm_locked(vm), false, lev APPEND_LOCATION_PARAMS);
vm_lock_enter(cr, GET_THREAD()->ec->fiber_ptr, vm, vm_locked(vm), false, lev APPEND_LOCATION_PARAMS);
}

void
Expand All @@ -174,7 +174,7 @@ rb_vm_lock_leave_body_nb(unsigned int *lev APPEND_LOCATION_ARGS)
void
rb_vm_lock_leave_body(unsigned int *lev APPEND_LOCATION_ARGS)
{
vm_lock_leave(GET_VM(), false, lev APPEND_LOCATION_PARAMS);
vm_lock_leave(GET_VM(), false, lev APPEND_LOCATION_PARAMS);
}

void
Expand All @@ -183,7 +183,7 @@ rb_vm_lock_body(LOCATION_ARGS)
rb_vm_t *vm = GET_VM();
ASSERT_vm_unlocking();

vm_lock_enter(GET_RACTOR(), vm, false, false, &vm->ractor.sync.lock_rec APPEND_LOCATION_PARAMS);
vm_lock_enter(GET_RACTOR(), GET_THREAD()->ec->fiber_ptr, vm, false, false, &vm->ractor.sync.lock_rec APPEND_LOCATION_PARAMS);
}

void
Expand All @@ -200,18 +200,18 @@ vm_cond_wait(rb_vm_t *vm, rb_nativethread_cond_t *cond, unsigned long msec)
{
ASSERT_vm_locking();
unsigned int lock_rec = vm->ractor.sync.lock_rec;
rb_ractor_t *cr = vm->ractor.sync.lock_owner;
rb_fiber_t *fiber = vm->ractor.sync.lock_owner_fiber;

vm->ractor.sync.lock_rec = 0;
vm->ractor.sync.lock_owner = NULL;
vm->ractor.sync.lock_owner_fiber = NULL;
if (msec > 0) {
rb_native_cond_timedwait(cond, &vm->ractor.sync.lock, msec);
}
else {
rb_native_cond_wait(cond, &vm->ractor.sync.lock);
}
vm->ractor.sync.lock_rec = lock_rec;
vm->ractor.sync.lock_owner = cr;
vm->ractor.sync.lock_owner_fiber = fiber;
}

void
Expand Down Expand Up @@ -242,14 +242,12 @@ rb_vm_barrier(void)
RB_DEBUG_COUNTER_INC(vm_sync_barrier);

if (!rb_multi_ractor_p()) {
// no other ractors
return;
}
else {
rb_vm_t *vm = GET_VM();
rb_ractor_t *cr = vm->ractor.sync.lock_owner;

ASSERT_vm_locking();
rb_vm_t *vm = GET_VM();
rb_ractor_t *cr = rb_fiber_threadptr(vm->ractor.sync.lock_owner_fiber)->ractor;
VM_ASSERT(cr == GET_RACTOR());
VM_ASSERT(rb_ractor_status_p(cr, ractor_running));

Expand Down
Loading