Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ require 'rake/testtask'
Rake::TestTask.new 'test' do |t|
t.libs = %w(lib test)
t.pattern = "test/*_test.rb"
t.verbose = true
t.warning = true
end

# ==========================================================
Expand Down
5 changes: 2 additions & 3 deletions ext/semian/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,12 @@ semian_resource_reset_workers(VALUE self)
VALUE
semian_resource_unregister_worker(VALUE self)
{
int ret;
semian_resource_t *res = NULL;

TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);

sem_meta_lock(res->sem_id);
ret = perform_semop(res->sem_id, SI_SEM_REGISTERED_WORKERS, -1, IPC_NOWAIT | SEM_UNDO, NULL);
dprintf("Unregistering worker for sem_id:%d", res->sem_id);
int ret = perform_semop(res->sem_id, SI_SEM_REGISTERED_WORKERS, -1, IPC_NOWAIT | SEM_UNDO, NULL);
sem_meta_unlock(res->sem_id);

if ( ret == -1) {
Expand Down
99 changes: 57 additions & 42 deletions ext/semian/sliding_window.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ static void init_fn(void* ptr)
res->max_size = 0;
res->length = 0;
res->start = 0;
res->end = 0;
}

static int
Expand Down Expand Up @@ -92,16 +91,17 @@ check_scale_factor_arg(VALUE scale_factor)
}

static VALUE
grow_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
grow_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size)
{
if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil;

int end = window->max_size ? (window->start + window->length) % window->max_size : 0;
dprintf("Growing window - sem_id:%d start:%d end:%d length:%d max_size:%d new_max_size:%d", sem_id, window->start, end, window->length, window->max_size, new_max_size);

if (window->length == 0) {
window->start = 0;
window->end = 0;
} else if (window->end > window->start) {
} else if (end > window->start) {
// Easy case - the window doesn't wrap around
window->end = window->start + window->length;
} else {
// Hard case - the window wraps, and data might need to move
int offset = new_max_size - window->max_size;
Expand All @@ -125,24 +125,25 @@ static void swap(int *a, int *b) {
}

static VALUE
shrink_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
shrink_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size)
{
if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil;

int new_length = (new_max_size > window->length) ? window->length : new_max_size;

dprintf("Shrinking window - start:%d end:%d length:%d max_size:%d", window->start, window->end, window->length, window->max_size);
int end = window->max_size ? (window->start + window->length) % window->max_size : 0;
dprintf("Shrinking window - sem_id:%d start:%d end:%d length:%d max_size:%d new_max_size:%d", sem_id, window->start, end, window->length, window->max_size, new_max_size);

if (window->length == 0) {
window->start = 0;
window->end = 0;
} else if (window->end > window->start) {
} else if (end > window->start) {
// Easy case - the window doesn't wrap around
window->start = window->start + new_length;
} else {
// Hard case - the window wraps, so re-index the data
// Adapted from http://www.cplusplus.com/reference/algorithm/rotate/
int first = 0;
int middle = (window->end - new_max_size + window->max_size) % window->max_size;
int middle = (end - new_max_size + window->max_size) % window->max_size;
int last = window->max_size;
int next = middle;
while (first != next) {
Expand All @@ -154,7 +155,6 @@ shrink_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
}
}
window->start = 0;
window->end = new_length;
}

window->max_size = new_max_size;
Expand All @@ -164,18 +164,14 @@ shrink_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
}

static VALUE
resize_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
resize_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size)
{
if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil;

if (window->max_size < new_max_size) {
dprintf("Growing window to %d", new_max_size);
return grow_window(window, new_max_size);
return grow_window(sem_id, window, new_max_size);
} else if (window->max_size > new_max_size) {
dprintf("Shrinking window to %d", new_max_size);
return shrink_window(window, new_max_size);
} else {
dprintf("Not re-sizing window");
return shrink_window(sem_id, window, new_max_size);
}

return Qnil;
Expand All @@ -202,6 +198,7 @@ Init_SlidingWindow()
rb_define_alloc_func(cSlidingWindow, semian_simple_sliding_window_alloc);
rb_define_method(cSlidingWindow, "initialize_sliding_window", semian_simple_sliding_window_initialize, 3);
rb_define_method(cSlidingWindow, "size", semian_simple_sliding_window_size, 0);
rb_define_method(cSlidingWindow, "length", semian_simple_sliding_window_size, 0); // Alias
rb_define_method(cSlidingWindow, "resize_to", semian_simple_sliding_window_resize_to, 1);
rb_define_method(cSlidingWindow, "max_size", semian_simple_sliding_window_max_size_get, 0);
rb_define_method(cSlidingWindow, "max_size=", semian_simple_sliding_window_max_size_set, 1);
Expand Down Expand Up @@ -270,7 +267,7 @@ semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size,
int error_threshold = max(res->error_threshold, (int) ceil(workers * scale * res->error_threshold));

dprintf(" workers:%d scale:%0.2f error_threshold:%d", workers, scale, error_threshold);
resize_window(res->shmem, error_threshold);
resize_window(res->sem_id, res->shmem, error_threshold);
}
sem_meta_unlock(res->sem_id);

Expand Down Expand Up @@ -305,7 +302,7 @@ semian_simple_sliding_window_resize_to(VALUE self, VALUE new_size)

sem_meta_lock(res->sem_id);
{
retval = resize_window(res->shmem, new_max_size);
retval = resize_window(res->sem_id, res->shmem, new_max_size);
}
sem_meta_unlock(res->sem_id);

Expand Down Expand Up @@ -340,7 +337,7 @@ semian_simple_sliding_window_max_size_set(VALUE self, VALUE new_size)

sem_meta_lock(res->sem_id);
{
retval = resize_window(res->shmem, new_max_size);
retval = resize_window(res->sem_id, res->shmem, new_max_size);
}
sem_meta_unlock(res->sem_id);

Expand Down Expand Up @@ -393,13 +390,25 @@ semian_simple_sliding_window_clear(VALUE self)
dprintf("Clearing sliding window");
res->shmem->length = 0;
res->shmem->start = 0;
res->shmem->end = 0;
}
sem_meta_unlock(res->sem_id);

return self;
}

// Handy for debugging the sliding window, but too noisy for regular debugging.
/*
static void dprint_window(semian_simple_sliding_window_shared_t *window)
{
dprintf("---");
for (int i = 0; i < window->length; ++i) {
const int index = (window->start + i) % window->max_size;
dprintf(" %0d: data[%d] = %d", i, index, window->data[index]);
}
dprintf("---");
}
*/

VALUE
semian_simple_sliding_window_reject(VALUE self)
{
Expand All @@ -409,24 +418,30 @@ semian_simple_sliding_window_reject(VALUE self)

sem_meta_lock(res->sem_id);
{
// Store these values because we're going to be modifying the buffer.
int start = res->shmem->start;
int length = res->shmem->length;
dprintf("reject! - start:%d end:%d length:%d max_size:%d", res->shmem->start, res->shmem->end, res->shmem->length, res->shmem->max_size);

int cleared = 0;
for (int i = 0; i < length; ++i) {
int index = (start + i) % length;
int value = res->shmem->data[index];
VALUE y = rb_yield(RB_INT2NUM(value));
if (RTEST(y)) {
if (cleared++ != i) {
sem_meta_unlock(res->sem_id);
rb_raise(rb_eArgError, "reject! must delete monotonically");
semian_simple_sliding_window_shared_t *window = res->shmem;
const int start = window->start;
const int length = window->length;
const int max_size = window->max_size;

if (max_size && length) {
int wptr = (start + length + max_size - 1) % max_size;

dprintf("Before reject! start:%d length:%d max_size:%d", window->start, window->length, window->max_size);
for (int i = 0; i < length; ++i) {
const int rptr = (start + length + max_size - i - 1) % max_size;

const int value = window->data[rptr];
if (RTEST(rb_yield(RB_INT2NUM(value)))) {
window->length--;
window->data[wptr] = value;
} else {
window->data[wptr] = value;
wptr = (wptr + max_size - 1) % max_size;
}
res->shmem->start = (res->shmem->start + 1) % res->shmem->length;
res->shmem->length--;
}

window->start = (wptr + 1) % max_size;
dprintf("After reject! start:%d length:%d max_size:%d", window->start, window->length, window->max_size);
}
}
sem_meta_unlock(res->sem_id);
Expand All @@ -441,18 +456,18 @@ semian_simple_sliding_window_push(VALUE self, VALUE value)

sem_meta_lock(res->sem_id);
{
dprintf("Before: start:%d end:%d length:%d max_size:%d", res->shmem->start, res->shmem->end, res->shmem->length, res->shmem->max_size);
dprintf("Before: start:%d length:%d max_size:%d", res->shmem->start, res->shmem->length, res->shmem->max_size);
// If the window is full, make room by popping off the front.
if (res->shmem->length == res->shmem->max_size) {
res->shmem->length--;
res->shmem->start = (res->shmem->start + 1) % res->shmem->max_size;
}

// Push onto the back of the window.
int index = (res->shmem->start + res->shmem->length) % res->shmem->max_size;
res->shmem->length++;
res->shmem->data[res->shmem->end] = RB_NUM2INT(value);
dprintf("Pushed %d onto data[%d] (length %d)", RB_NUM2INT(value), res->shmem->end, res->shmem->length);
res->shmem->end = (res->shmem->end + 1) % res->shmem->max_size;
res->shmem->data[index] = RB_NUM2INT(value);
dprintf("Pushed %d onto data[%d] (length %d)", RB_NUM2INT(value), index, res->shmem->length);
}
sem_meta_unlock(res->sem_id);

Expand Down
1 change: 0 additions & 1 deletion ext/semian/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ typedef struct {
int max_size;
int length;
int start;
int end;
int data[SLIDING_WINDOW_MAX_SIZE];
} semian_simple_sliding_window_shared_t;

Expand Down
7 changes: 6 additions & 1 deletion ext/semian/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <stdarg.h>
#include <stdio.h>
#include <time.h>

#include <openssl/sha.h>
#include <ruby.h>
Expand All @@ -16,7 +17,11 @@
#define dprintf(fmt, ...) \
do { \
if (DEBUG_TEST) { \
printf("[DEBUG] %s:%d - " fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
const pid_t pid = getpid(); \
struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); \
struct tm t; localtime_r(&(ts.tv_sec), &t); \
char buf[128]; strftime(buf, sizeof(buf), "%H:%M:%S", &t); \
printf("%s.%ld [DEBUG] (%d): %s:%d - " fmt "\n", buf, ts.tv_nsec, pid, __FILE__, __LINE__, ##__VA_ARGS__); \
} \
} while (0)

Expand Down
1 change: 1 addition & 0 deletions lib/semian/circuit_breaker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ class CircuitBreaker #:nodoc:
extend Forwardable

def_delegators :@state, :closed?, :open?, :half_open?
def_delegators :@errors, :size, :max_size, :values

attr_reader :name, :half_open_resource_timeout, :error_timeout, :state, :last_error

Expand Down
2 changes: 1 addition & 1 deletion lib/semian/protected_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class ProtectedResource

def_delegators :@bulkhead, :destroy, :count, :semid, :tickets, :registered_workers
def_delegators :@circuit_breaker, :reset, :mark_failed, :mark_success, :request_allowed?,
:open?, :closed?, :half_open?
:open?, :closed?, :half_open?, :size, :max_size, :values

attr_reader :bulkhead, :circuit_breaker, :name
attr_accessor :updated_at
Expand Down
12 changes: 12 additions & 0 deletions lib/semian/resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ def count
0
end

def size
0
end

def max_size
0
end

def values
[]
end

def tickets
0
end
Expand Down
12 changes: 12 additions & 0 deletions lib/semian/unprotected_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ def count
0
end

def size
0
end

def max_size
0
end

def values
[]
end

def semid
0
end
Expand Down
30 changes: 30 additions & 0 deletions test/circuit_breaker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,35 @@ def setup
end
Semian.register(id, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1)
@resource = Semian[id]
@resource.reset
end

def test_destroy
id = Time.now.strftime('%H:%M:%S.%N')

# Create the resource and check that it was reset.
Semian.register(id, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1)
resource = Semian[id]
assert_equal(0, resource.size)
assert_equal(2, resource.max_size)
assert_equal([], resource.values)

# Open the circuit.
open_circuit!(resource, 2)
assert_equal(2, resource.size)
assert_equal(2, resource.max_size)

# Destroy the resource and check that it was destroyed.
Semian.destroy(id)
resource = Semian[id]
assert_nil(resource, "Resource was not destroyed")

# Re-create the resource and check that it was reset.
Semian.register(id, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1)
resource = Semian[id]
assert_equal(0, resource.size)
assert_equal(2, resource.max_size)
assert_equal([], resource.values)
end

def test_acquire_yield_when_the_circuit_is_closed
Expand Down Expand Up @@ -41,6 +70,7 @@ def test_after_error_timeout_is_elapsed_requests_are_attempted_again
end

def test_until_success_threshold_is_reached_a_single_error_will_reopen_the_circuit
assert_equal(0, @resource.size)
half_open_cicuit!
trigger_error!
assert_circuit_opened
Expand Down
4 changes: 3 additions & 1 deletion test/resource_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,12 @@ def test_sem_undo
end
end

# TODO(michaelkipper): Shouldn't need to rescue InternalError, this test
# should deterministically throw SyscallError.
def test_destroy
resource = create_resource :testing, tickets: 1
resource.destroy
assert_raises Semian::SyscallError do
assert_raises(Semian::InternalError, Semian::SyscallError) do
resource.acquire {}
end
end
Expand Down
Loading