diff --git a/demo/main.cpp b/demo/main.cpp index 0372c67..099575c 100644 --- a/demo/main.cpp +++ b/demo/main.cpp @@ -110,6 +110,37 @@ struct client_ssl_functor_t cql::cql_client_t::cql_log_callback_t _log_callback; }; +static bool terminate = false; + +static void* workThread( void* args ) { + cql::cql_client_pool_t* pool = (cql::cql_client_pool_t*) args; + + while ( !terminate ) { + // execute a query, select all rows from the keyspace + boost::shared_future future = pool->query("SELECT * from system.schema_keyspaces;", cql::CQL_CONSISTENCY_ONE); + + // wait for the query to execute + future.wait(); + + // check whether the query succeeded + std::cout << "select successfull? " << (!future.get().error.is_err() ? "true" : "false") << std::endl; + + if (future.get().error.is_err()) { + // Don't let this disappear in the log messages. + std::cout << "CATASTROPHIC ERROR: " << future.get().error.message << std::endl; + exit(1); + } + + if (future.get().result) { + // print the rows return by the successful query + print_rows(*future.get().result); + } + } + + return 0; +} + + int main(int argc, char**) @@ -142,18 +173,28 @@ main(int argc, // Construct the pool std::auto_ptr pool(cql::cql_client_pool_factory_t::create_client_pool_t(client_factory, NULL, NULL)); - // Add a client to the pool, this operation returns a future. - boost::shared_future connect_future = pool->add_client("localhost", 9042); + const int numthreads = 3; + + // TODO: What yields the best performance if the number of work threads is larger that + // that of available backends? One connection per backend in the pool or enouch + // identical connections to serve all client threads? - // Wait until the connection is complete, or has failed. - connect_future.wait(); + for (int i = 0; i < numthreads; i++) { + // Add a client to the pool, this operation returns a future. + boost::shared_future connect_future = pool->add_client("localhost", 9042); + + // Wait until the connection is complete, or has failed. + connect_future.wait(); - // Check whether or not the connection was successful. - std::cout << "connect successfull? "; - if (!connect_future.get().error.is_err()) { - // The connection succeeded - std::cout << "TRUE" << std::endl; + // Check whether or not the connection was successful. + std::cout << "connect successfull? "; + if (!connect_future.get().error.is_err()) { + // The connection succeeded + std::cout << "TRUE" << std::endl; + } + } + if (pool->size() > 0) { // execute a query, switch keyspaces boost::shared_future future = pool->query("USE system;", cql::CQL_CONSISTENCY_ONE); @@ -176,6 +217,27 @@ main(int argc, print_rows(*future.get().result); } + // More of the same from a multithreaded context. + pthread_t pids[numthreads]; + + for (int i = 0; i < numthreads; i++) { + pthread_t pid; + pthread_create( &pid, NULL, workThread, (void*) (pool.get()) ); + pids[i] = pid; + } + + // Let the threads run for a bit. + sleep (5); + + terminate = true; + std::cout << "WRAPPING UP" << std::endl; + + // give all threads the chance to finish + for (int i = 0; i < numthreads; i++) { + void *status; + pthread_join(pids[i], &status); + } + // close the connection pool pool->close(); } diff --git a/include/libcql/internal/cql_callback_storage.hpp b/include/libcql/internal/cql_callback_storage.hpp index 18577b9..ad3cfcf 100644 --- a/include/libcql/internal/cql_callback_storage.hpp +++ b/include/libcql/internal/cql_callback_storage.hpp @@ -91,6 +91,8 @@ namespace cql { void set_allocated() { + // Something went badly wrong if the object is already allocated. + assert(e.next_free.index != -2); e.next_free.index = -2; } }; @@ -98,6 +100,7 @@ namespace cql { typedef entry_t array_entry_t; array_entry_t* array; int32_t next_free_index; + boost::mutex _mutex; public: explicit small_indexed_storage(uint16_t size) : @@ -116,6 +119,8 @@ namespace cql { allocate() { int32_t result; + boost::mutex::scoped_lock lock(_mutex); + if ( (result = next_free_index) >= 0) { if (array[next_free_index].next_free_cnt() > 0) { array[++next_free_index].set_next_free(array[result].next_free_index(), array[result].next_free_cnt()-1); @@ -133,6 +138,8 @@ namespace cql { void release(int32_t index) { + boost::mutex::scoped_lock lock(_mutex); + array[index].set_next_free(next_free_index); next_free_index = index; }