Skip to content
Andrey Sibiryov edited this page Aug 3, 2013 · 1 revision

API requirements for Cocaine frameworks. Pseudocode.

svc = my_service_proxy("my-service-name");

// Might as well be dynamically generated using the Locator maps, if the
// language runtime provides facilities for such things (Python, Ruby, ...).
svc = Service("my-service-name");

using result_type = my_service_proxy::method::result_type;

// Each method returns a stream of result_types. Most methods will only push
// one single chunk into that stream.
stream<result_type> stream = svc.method("abc", 42);

try {
    // Stream version of future::get().
    // Blocks until a new chunk is received and returns it.
    // Throws when a choke or an error was received.
    result_type chunk = stream.next();
} catch(...) {
    // Do something useful.
}

// Blocks until either the timeout expires or a new chunk is received.
stream.wait(5.0f) throw();

// Checks if the next chunk is available for non-blocking get().
stream.is_ready() throw();

int process(result_type& chunk) {
    return 42;
}

future<int> f = stream.then([](stream<result_type>& s) {
    assert(s.is_ready() == true);

    result_type chunk = s.next();

    // Next chunk will be processed with another callback.
    s.then(some_other_callback);
    
    if(chunk_one.empty()) {
        // Can throw from inside the callback.
        throw runtime_error();
    }
    
    return process(chunk_one);
});

// The future will be ready when the first chunk is received and processed.
assert(f.get() == 42);

// Stream every chunk through a callback.
stream<int> result_stream = stream.map([](future<result_type>& f) {
    assert(f.is_ready() == true);
    return convert_to_int(f.get());
});

// gather() returns a future<vector<result_type>>. This future will be ready when
// all the chunks and a choke are received. If an error is received somewhere in the middle
// of the stream, the future will throw.
std::vector<int> ints = result_stream.gather().get();

// Reduce the stream.
future<int> result_future = stream.reduce(T initial, [](T acc, future<result_type>& f) {
    assert(f.is_ready() == true);
    acc += f.get();
});
Clone this wiki locally