From b5e01e0e6c56392072773f25c87c20f5871a63ab Mon Sep 17 00:00:00 2001 From: githubmonkey Date: Tue, 3 Sep 2013 14:24:30 +0200 Subject: [PATCH 1/6] added example query calls from multithreaded context --- demo/main.cpp | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/demo/main.cpp b/demo/main.cpp index 0372c67..d7bf3f9 100644 --- a/demo/main.cpp +++ b/demo/main.cpp @@ -110,6 +110,29 @@ struct client_ssl_functor_t cql::cql_client_t::cql_log_callback_t _log_callback; }; + +static void* workThread( void* args ) { + cql::cql_client_pool_t* pool = (cql::cql_client_pool_t*) args; + + while ( true ) { + // execute a query, select all rows from the keyspace + boost::shared_future future = pool->query("SELECT * from schema_keyspaces;", cql::CQL_CONSISTENCY_ONE); + + // wait for the query to execute + future.wait(); + + // check whether the query succeeded + std::cout << "select successfull? " << (!future.get().error.is_err() ? "true" : "false") << std::endl; + if (future.get().result) { + // print the rows return by the successful query + print_rows(*future.get().result); + } + } + + return 0; +} + + int main(int argc, char**) @@ -176,6 +199,15 @@ main(int argc, print_rows(*future.get().result); } + // More of the same from a multithreaded context. + for (int i = 0; i < 10; i++) { + pthread_t pid; + pthread_create( &pid, NULL, workThread, (void*) (pool.get()) ); + pthread_detach( pid ); + } + + sleep (60); + // close the connection pool pool->close(); } From a0c108513db2256e23ff769a80fa00b0bf591240 Mon Sep 17 00:00:00 2001 From: githubmonkey Date: Tue, 3 Sep 2013 17:11:47 +0200 Subject: [PATCH 2/6] fixed connection pool init --- demo/main.cpp | 48 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/demo/main.cpp b/demo/main.cpp index d7bf3f9..61db483 100644 --- a/demo/main.cpp +++ b/demo/main.cpp @@ -110,19 +110,27 @@ struct client_ssl_functor_t cql::cql_client_t::cql_log_callback_t _log_callback; }; +static bool terminate = false; static void* workThread( void* args ) { cql::cql_client_pool_t* pool = (cql::cql_client_pool_t*) args; - while ( true ) { + while ( !terminate ) { // execute a query, select all rows from the keyspace - boost::shared_future future = pool->query("SELECT * from schema_keyspaces;", cql::CQL_CONSISTENCY_ONE); + boost::shared_future future = pool->query("SELECT * from system.schema_keyspaces;", cql::CQL_CONSISTENCY_ONE); // wait for the query to execute future.wait(); // check whether the query succeeded std::cout << "select successfull? " << (!future.get().error.is_err() ? "true" : "false") << std::endl; + + if (future.get().error.is_err()) { + // Don't let this disappear in the log messages. + std::cout << "CATASTROPHIC ERROR: " << future.get().error.message << std::endl; + exit(0); + } + if (future.get().result) { // print the rows return by the successful query print_rows(*future.get().result); @@ -165,18 +173,24 @@ main(int argc, // Construct the pool std::auto_ptr pool(cql::cql_client_pool_factory_t::create_client_pool_t(client_factory, NULL, NULL)); - // Add a client to the pool, this operation returns a future. - boost::shared_future connect_future = pool->add_client("localhost", 9042); - - // Wait until the connection is complete, or has failed. - connect_future.wait(); + int numthreads = 10; - // Check whether or not the connection was successful. - std::cout << "connect successfull? "; - if (!connect_future.get().error.is_err()) { - // The connection succeeded - std::cout << "TRUE" << std::endl; + for (int i = 0; i < numthreads; i++) { + // Add a client to the pool, this operation returns a future. + boost::shared_future connect_future = pool->add_client("localhost", 9042); + + // Wait until the connection is complete, or has failed. + connect_future.wait(); + + // Check whether or not the connection was successful. + std::cout << "connect successfull? "; + if (!connect_future.get().error.is_err()) { + // The connection succeeded + std::cout << "TRUE" << std::endl; + } + } + if (pool->size() > 0) { // execute a query, switch keyspaces boost::shared_future future = pool->query("USE system;", cql::CQL_CONSISTENCY_ONE); @@ -200,13 +214,19 @@ main(int argc, } // More of the same from a multithreaded context. - for (int i = 0; i < 10; i++) { + for (int i = 0; i < numthreads; i++) { pthread_t pid; pthread_create( &pid, NULL, workThread, (void*) (pool.get()) ); pthread_detach( pid ); } - sleep (60); + // Let the threads run for a bit. + sleep (600); + terminate = true; + std::cout << "WRAPPING UP" << std::endl; + + // give all threads the chance to finish + sleep(3); // close the connection pool pool->close(); From d6e37bb245fe665c6871a9075398eaecf524da0e Mon Sep 17 00:00:00 2001 From: githubmonkey Date: Tue, 3 Sep 2013 17:38:56 +0200 Subject: [PATCH 3/6] changed pthread use to explicit join --- demo/main.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/demo/main.cpp b/demo/main.cpp index 61db483..5e38cff 100644 --- a/demo/main.cpp +++ b/demo/main.cpp @@ -173,12 +173,12 @@ main(int argc, // Construct the pool std::auto_ptr pool(cql::cql_client_pool_factory_t::create_client_pool_t(client_factory, NULL, NULL)); - int numthreads = 10; + const int numthreads = 10; for (int i = 0; i < numthreads; i++) { // Add a client to the pool, this operation returns a future. boost::shared_future connect_future = pool->add_client("localhost", 9042); - + // Wait until the connection is complete, or has failed. connect_future.wait(); @@ -214,19 +214,25 @@ main(int argc, } // More of the same from a multithreaded context. + pthread_t pids[numthreads]; + for (int i = 0; i < numthreads; i++) { pthread_t pid; pthread_create( &pid, NULL, workThread, (void*) (pool.get()) ); - pthread_detach( pid ); + pids[i] = pid; } // Let the threads run for a bit. sleep (600); + terminate = true; std::cout << "WRAPPING UP" << std::endl; // give all threads the chance to finish - sleep(3); + for (int i = 0; i < numthreads; i++) { + void *status; + pthread_join(pids[i], &status); + } // close the connection pool pool->close(); From cf47bf0b17431a85eb719e6b42ccccedf6c9b0fe Mon Sep 17 00:00:00 2001 From: githubmonkey Date: Wed, 4 Sep 2013 08:27:42 +0200 Subject: [PATCH 4/6] Added a comment to emphasize that the purpose of this demo extension is to highlight a potential bug inside libcql. As far as I know libcql is meant to be thread safe, but this demo shows otherwise. A bunch of concurrent calls to pool->query() inevitably result in an exception "The associated promise has been destructed prior to the associated state becoming ready." Hopefully, this will get the community interested in helping me debugging the demo and underlying libcql calls... --- demo/main.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/demo/main.cpp b/demo/main.cpp index 5e38cff..3dd949b 100644 --- a/demo/main.cpp +++ b/demo/main.cpp @@ -117,7 +117,12 @@ static void* workThread( void* args ) { while ( !terminate ) { // execute a query, select all rows from the keyspace - boost::shared_future future = pool->query("SELECT * from system.schema_keyspaces;", cql::CQL_CONSISTENCY_ONE); + + // NOTE: This call will result in a race condition within a few seconds after starting + // execution. "The associated promise has been destructed prior to the associated + // state becoming ready." + + boost::shared_future future = pool->query("SELECT * from system.schema_keyspaces;", cql::CQL_CONSISTENCY_ONE); // wait for the query to execute future.wait(); From 462efb7fb43a0ac5f866eab4333a77f6b6251210 Mon Sep 17 00:00:00 2001 From: githubmonkey Date: Wed, 4 Sep 2013 13:15:18 +0200 Subject: [PATCH 5/6] Make sure competing threads don't end up with the same stream id and overwrite each others callback functions. This fixes a critical bug reported in https://github.com/mstump/libcql/issues/44 --- include/libcql/internal/cql_callback_storage.hpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/include/libcql/internal/cql_callback_storage.hpp b/include/libcql/internal/cql_callback_storage.hpp index 18577b9..ad3cfcf 100644 --- a/include/libcql/internal/cql_callback_storage.hpp +++ b/include/libcql/internal/cql_callback_storage.hpp @@ -91,6 +91,8 @@ namespace cql { void set_allocated() { + // Something went badly wrong if the object is already allocated. + assert(e.next_free.index != -2); e.next_free.index = -2; } }; @@ -98,6 +100,7 @@ namespace cql { typedef entry_t array_entry_t; array_entry_t* array; int32_t next_free_index; + boost::mutex _mutex; public: explicit small_indexed_storage(uint16_t size) : @@ -116,6 +119,8 @@ namespace cql { allocate() { int32_t result; + boost::mutex::scoped_lock lock(_mutex); + if ( (result = next_free_index) >= 0) { if (array[next_free_index].next_free_cnt() > 0) { array[++next_free_index].set_next_free(array[result].next_free_index(), array[result].next_free_cnt()-1); @@ -133,6 +138,8 @@ namespace cql { void release(int32_t index) { + boost::mutex::scoped_lock lock(_mutex); + array[index].set_next_free(next_free_index); next_free_index = index; } From fd56891ed93c1f62f82b27242fd5a022e22da644 Mon Sep 17 00:00:00 2001 From: githubmonkey Date: Wed, 4 Sep 2013 13:33:40 +0200 Subject: [PATCH 6/6] Remove comment that refers to a potential bug. Also reduce runtime and thread number to something more reasonable. This was exaggerated to demonstrate the bug but the problem has been addressed. --- demo/main.cpp | 109 +++++++++++++++++++++++++------------------------- 1 file changed, 54 insertions(+), 55 deletions(-) diff --git a/demo/main.cpp b/demo/main.cpp index 3dd949b..099575c 100644 --- a/demo/main.cpp +++ b/demo/main.cpp @@ -113,36 +113,31 @@ struct client_ssl_functor_t static bool terminate = false; static void* workThread( void* args ) { - cql::cql_client_pool_t* pool = (cql::cql_client_pool_t*) args; - - while ( !terminate ) { - // execute a query, select all rows from the keyspace - - // NOTE: This call will result in a race condition within a few seconds after starting - // execution. "The associated promise has been destructed prior to the associated - // state becoming ready." - - boost::shared_future future = pool->query("SELECT * from system.schema_keyspaces;", cql::CQL_CONSISTENCY_ONE); - - // wait for the query to execute + cql::cql_client_pool_t* pool = (cql::cql_client_pool_t*) args; + + while ( !terminate ) { + // execute a query, select all rows from the keyspace + boost::shared_future future = pool->query("SELECT * from system.schema_keyspaces;", cql::CQL_CONSISTENCY_ONE); + + // wait for the query to execute future.wait(); // check whether the query succeeded std::cout << "select successfull? " << (!future.get().error.is_err() ? "true" : "false") << std::endl; - if (future.get().error.is_err()) { - // Don't let this disappear in the log messages. - std::cout << "CATASTROPHIC ERROR: " << future.get().error.message << std::endl; - exit(0); - } - + if (future.get().error.is_err()) { + // Don't let this disappear in the log messages. + std::cout << "CATASTROPHIC ERROR: " << future.get().error.message << std::endl; + exit(1); + } + if (future.get().result) { // print the rows return by the successful query print_rows(*future.get().result); } - } - - return 0; + } + + return 0; } @@ -178,24 +173,28 @@ main(int argc, // Construct the pool std::auto_ptr pool(cql::cql_client_pool_factory_t::create_client_pool_t(client_factory, NULL, NULL)); - const int numthreads = 10; + const int numthreads = 3; - for (int i = 0; i < numthreads; i++) { - // Add a client to the pool, this operation returns a future. - boost::shared_future connect_future = pool->add_client("localhost", 9042); + // TODO: What yields the best performance if the number of work threads is larger that + // that of available backends? One connection per backend in the pool or enouch + // identical connections to serve all client threads? - // Wait until the connection is complete, or has failed. - connect_future.wait(); + for (int i = 0; i < numthreads; i++) { + // Add a client to the pool, this operation returns a future. + boost::shared_future connect_future = pool->add_client("localhost", 9042); + + // Wait until the connection is complete, or has failed. + connect_future.wait(); - // Check whether or not the connection was successful. - std::cout << "connect successfull? "; - if (!connect_future.get().error.is_err()) { - // The connection succeeded - std::cout << "TRUE" << std::endl; - } - } + // Check whether or not the connection was successful. + std::cout << "connect successfull? "; + if (!connect_future.get().error.is_err()) { + // The connection succeeded + std::cout << "TRUE" << std::endl; + } + } - if (pool->size() > 0) { + if (pool->size() > 0) { // execute a query, switch keyspaces boost::shared_future future = pool->query("USE system;", cql::CQL_CONSISTENCY_ONE); @@ -218,26 +217,26 @@ main(int argc, print_rows(*future.get().result); } - // More of the same from a multithreaded context. - pthread_t pids[numthreads]; - - for (int i = 0; i < numthreads; i++) { - pthread_t pid; - pthread_create( &pid, NULL, workThread, (void*) (pool.get()) ); - pids[i] = pid; - } - - // Let the threads run for a bit. - sleep (600); - - terminate = true; - std::cout << "WRAPPING UP" << std::endl; - - // give all threads the chance to finish - for (int i = 0; i < numthreads; i++) { - void *status; - pthread_join(pids[i], &status); - } + // More of the same from a multithreaded context. + pthread_t pids[numthreads]; + + for (int i = 0; i < numthreads; i++) { + pthread_t pid; + pthread_create( &pid, NULL, workThread, (void*) (pool.get()) ); + pids[i] = pid; + } + + // Let the threads run for a bit. + sleep (5); + + terminate = true; + std::cout << "WRAPPING UP" << std::endl; + + // give all threads the chance to finish + for (int i = 0; i < numthreads; i++) { + void *status; + pthread_join(pids[i], &status); + } // close the connection pool pool->close();