diff --git a/.gitignore b/.gitignore index 2a03973..a81f27b 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,13 @@ build/ # IntelliJ .idea/ +# Qt-Creator (assuming a project called "redox") +redox.config +redox.creaor +redox.files +redox.includes +redox.creator.user + # Todo file TODO.md diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b081b6..4b474cb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -144,7 +144,7 @@ if (examples) add_executable(binary_data_publish examples/binary_data_publish.cpp) target_link_libraries(binary_data_publish redox) - + add_executable(pub_sub examples/pub_sub.cpp) target_link_libraries(pub_sub redox) @@ -204,7 +204,7 @@ set(CPACK_PACKAGE_DESCRIPTION_FILE "${CMAKE_CURRENT_SOURCE_DIR}/README.md") set(CPACK_PACKAGE_VERSION_MAJOR "${REDOX_VERSION_MAJOR}") set(CPACK_PACKAGE_VERSION_MINOR "${REDOX_VERSION_MINOR}") set(CPACK_PACKAGE_VERSION_PATCH "${REDOX_VERSION_PATCH}") -set(CPACK_PACKAGE_VERSION_RELEASE "1") # Increase this if a failed build was published +set(CPACK_PACKAGE_VERSION_RELEASE "1.1") # Increase this if a failed build was published set(CPACK_PACKAGE_VERSION "${CPACK_PACKAGE_VERSION_MAJOR}.${CPACK_PACKAGE_VERSION_MINOR}.${CPACK_PACKAGE_VERSION_PATCH}-${CPACK_PACKAGE_VERSION_RELEASE}") set(CPACK_SYSTEM_NAME "${ARCHITECTURE}") set(CPACK_PACKAGE_FILE_NAME "${PROJECT_NAME}-${CPACK_PACKAGE_VERSION}.${ARCHITECTURE}") diff --git a/include/redox/client.hpp b/include/redox/client.hpp index 12a95bc..9e23af8 100644 --- a/include/redox/client.hpp +++ b/include/redox/client.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -372,6 +373,8 @@ class Redox { std::unordered_map *> commands_long_long_int_; std::unordered_map *> commands_null_; std::unordered_map> *> commands_vector_string_; + std::unordered_map> *> commands_map_string_; + std::unordered_map>> *> commands_vectormap_string_; std::unordered_map> *> commands_set_string_; std::unordered_map> *> commands_unordered_set_string_; @@ -390,7 +393,7 @@ class Redox { template friend void Command::free(); // Access to call disconnectedCallback - template friend void Command::processReply(redisReply *r); + template friend void Command::processReply(redisReply *r, const std::string& errorMessage); }; // ------------------------------------------------ @@ -408,7 +411,7 @@ Command &Redox::createCommand(const std::vector &cmd, } } - auto *c = new Command(this, commands_created_.fetch_add(1), cmd, + auto *c = new Command(this, commands_created_.fetch_add(1), cmd, callback, repeat, after, free_memory, logger_); std::lock_guard lg(queue_guard_); diff --git a/include/redox/command.hpp b/include/redox/command.hpp index 69b44c4..b7c6f84 100644 --- a/include/redox/command.hpp +++ b/include/redox/command.hpp @@ -105,7 +105,7 @@ template class Command { bool free_memory, log::Logger &logger); // Handles a new reply from the server - void processReply(redisReply *r); + void processReply(redisReply *r, const std::string& errorMessage = std::string()); // Invoke a user callback from the reply object. This method is specialized // for each ReplyT of Command. diff --git a/include/redox/subscriber.hpp b/include/redox/subscriber.hpp index 63a89f4..fa8d00d 100644 --- a/include/redox/subscriber.hpp +++ b/include/redox/subscriber.hpp @@ -46,7 +46,7 @@ class Subscriber { * Same as .connect() on a Redox instance. */ bool connect(const std::string &host = REDIS_DEFAULT_HOST, const int port = REDIS_DEFAULT_PORT, - std::function connection_callback = nullptr) { + const std::function &connection_callback = nullptr) { return rdx_.connect(host, port, connection_callback); } @@ -54,7 +54,7 @@ class Subscriber { * Same as .connectUnix() on a Redox instance. */ bool connectUnix(const std::string &path = REDIS_DEFAULT_PATH, - std::function connection_callback = nullptr) { + const std::function &connection_callback = nullptr) { return rdx_.connectUnix(path, connection_callback); } @@ -80,11 +80,11 @@ class Subscriber { * sub_callback: invoked when successfully subscribed * err_callback: invoked on some error state */ - void subscribe(const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr); + void subscribe(const std::string& topic, + const std::function &msg_callback, + const std::function &sub_callback = nullptr, + const std::function &unsub_callback = nullptr, + const std::function &err_callback = nullptr); /** * Subscribe to a topic with a pattern. @@ -93,27 +93,27 @@ class Subscriber { * sub_callback: invoked when successfully subscribed * err_callback: invoked on some error state */ - void psubscribe(const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr); + void psubscribe(const std::string& topic, + const std::function &msg_callback, + const std::function &sub_callback = nullptr, + const std::function &unsub_callback = nullptr, + const std::function &err_callback = nullptr); /** * Unsubscribe from a topic. * * err_callback: invoked on some error state */ - void unsubscribe(const std::string topic, - std::function err_callback = nullptr); + void unsubscribe(const std::string& topic, + const std::function &err_callback = nullptr); /** * Unsubscribe from a topic with a pattern. * * err_callback: invoked on some error state */ - void punsubscribe(const std::string topic, - std::function err_callback = nullptr); + void punsubscribe(const std::string& topic, + const std::function &err_callback = nullptr); /** * Return the topics that are subscribed() to. @@ -133,15 +133,15 @@ class Subscriber { private: // Base for subscribe and psubscribe - void subscribeBase(const std::string cmd_name, const std::string topic, - std::function msg_callback, - std::function sub_callback = nullptr, - std::function unsub_callback = nullptr, - std::function err_callback = nullptr); + void subscribeBase(const std::string &cmd_name, const std::string &topic, + const std::function &msg_callback, + const std::function &sub_callback = nullptr, + const std::function &unsub_callback = nullptr, + const std::function &err_callback = nullptr); // Base for unsubscribe and punsubscribe - void unsubscribeBase(const std::string cmd_name, const std::string topic, - std::function err_callback = nullptr); + void unsubscribeBase(const std::string &cmd_name, const std::string &topic, + const std::function &err_callback = nullptr); // Underlying Redis client Redox rdx_; diff --git a/src/client.cpp b/src/client.cpp index 4401035..00b3a7a 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -365,17 +365,27 @@ template Command *Redox::findCommand(long id) { template void Redox::commandCallback(redisAsyncContext *ctx, void *r, void *privdata) { - Redox *rdx = (Redox *)ctx->data; - long id = (long)privdata; - redisReply *reply_obj = (redisReply *)r; + Redox *rdx = reinterpret_cast(ctx->data); + if (rdx == nullptr) { + return; + } + redisReply *reply_obj = reinterpret_cast(r); + + long id = (long)privdata; Command *c = rdx->findCommand(id); if (c == nullptr) { - freeReplyObject(reply_obj); + if (reply_obj != nullptr) { + freeReplyObject(reply_obj); + } return; } - c->processReply(reply_obj); + if (ctx->err) { + c->processReply(nullptr, std::string(ctx->errstr)); + } else { + c->processReply(reply_obj); + } } template bool Redox::submitToServer(Command *c) { @@ -460,6 +470,8 @@ void Redox::processQueuedCommands(struct ev_loop *loop, ev_async *async, int rev } else if (rdx->processQueuedCommand(id)) { } else if (rdx->processQueuedCommand>(id)) { } else if (rdx->processQueuedCommand>(id)) { + } else if (rdx->processQueuedCommand>(id)) { + } else if (rdx->processQueuedCommand>>(id)) { } else if (rdx->processQueuedCommand>(id)) { } else throw runtime_error("Command pointer not found in any queue!"); @@ -484,6 +496,8 @@ void Redox::freeQueuedCommands(struct ev_loop *loop, ev_async *async, int revent } else if (rdx->freeQueuedCommand(id)) { } else if (rdx->freeQueuedCommand>(id)) { } else if (rdx->freeQueuedCommand>(id)) { + } else if (rdx->freeQueuedCommand>(id)) { + } else if (rdx->freeQueuedCommand>>(id)) { } else if (rdx->freeQueuedCommand>(id)) { } else { } @@ -511,10 +525,16 @@ template bool Redox::freeQueuedCommand(long id) { } long Redox::freeAllCommands() { - return freeAllCommandsOfType() + freeAllCommandsOfType() + - freeAllCommandsOfType() + freeAllCommandsOfType() + - freeAllCommandsOfType() + freeAllCommandsOfType() + - freeAllCommandsOfType>() + freeAllCommandsOfType>() + + return freeAllCommandsOfType() + + freeAllCommandsOfType() + + freeAllCommandsOfType() + + freeAllCommandsOfType() + + freeAllCommandsOfType() + + freeAllCommandsOfType() + + freeAllCommandsOfType>() + + freeAllCommandsOfType>() + + freeAllCommandsOfType>() + + freeAllCommandsOfType>>() + freeAllCommandsOfType>(); } @@ -579,12 +599,19 @@ template <> unordered_map> *> &Redox::getCommandMap return commands_vector_string_; } -template <> unordered_map> *> &Redox::getCommandMap>() { +template <> unordered_map> *> &Redox::getCommandMap>() { + return commands_map_string_; +} + +template <> unordered_map>> *> &Redox::getCommandMap>>() { + return commands_vectormap_string_; +} + +template <> unordered_map> *> &Redox::getCommandMap>() { return commands_set_string_; } -template <> -unordered_map> *> & +template <> unordered_map> *> & Redox::getCommandMap>() { return commands_unordered_set_string_; } diff --git a/src/command.cpp b/src/command.cpp index 95907b8..848d934 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -20,13 +20,44 @@ #include #include +#include #include +#include #include "command.hpp" #include "client.hpp" using namespace std; +namespace { + +// Helper function to process replies that should convert into a map +void parseReplyObject(redisReply* reply_obj, map& reply_val) { + + if (reply_obj->type != REDIS_REPLY_ARRAY) { + return; + } + + for (size_t i = 0; i < reply_obj->elements; i = i + 2) { + redisReply *r = *(reply_obj->element + i); + size_t next = i + 1; + if (next < reply_obj->elements) { + redisReply *rnext = *(reply_obj->element + next); + reply_val.emplace( + std::piecewise_construct, + std::forward_as_tuple(r->str, r->len), + std::forward_as_tuple(rnext->str, rnext->len)); + } else { + reply_val.emplace( + std::piecewise_construct, + std::forward_as_tuple(r->str, r->len), + std::forward_as_tuple()); + } + } +} + +} + namespace redox { template @@ -44,14 +75,19 @@ template void Command::wait() { waiting_done_ = {false}; } -template void Command::processReply(redisReply *r) { +template void Command::processReply(redisReply *r, const std::string& errorMessage) { last_error_.clear(); reply_obj_ = r; if (reply_obj_ == nullptr) { reply_status_ = ERROR_REPLY; - last_error_ = "Received null redisReply* from hiredis."; + if (errorMessage.empty()) { + last_error_ = "Received null redisReply* from hiredis."; + } + else { + last_error_ = errorMessage; + } logger_.error() << last_error_; Redox::disconnectedCallback(rdx_->ctx_, REDIS_ERR); @@ -236,6 +272,27 @@ template <> void Command>::parseReplyObject() { } } +template <> void Command>::parseReplyObject() { + + if (!isExpectedReply(REDIS_REPLY_ARRAY)) + return; + + ::parseReplyObject(reply_obj_, reply_val_); +} + +template <> void Command>>::parseReplyObject() { + + if (!isExpectedReply(REDIS_REPLY_ARRAY)) + return; + + for (size_t i = 0; i < reply_obj_->elements; i++) { + redisReply *r = *(reply_obj_->element + i); + map reply_val; + ::parseReplyObject(r, reply_val); + reply_val_.push_back(reply_val); + } +} + template <> void Command>::parseReplyObject() { if (!isExpectedReply(REDIS_REPLY_ARRAY)) @@ -269,6 +326,8 @@ template class Command; template class Command; template class Command>; template class Command>; +template class Command>; +template class Command>>; template class Command>; } // End namespace redox diff --git a/src/subscriber.cpp b/src/subscriber.cpp index 8847357..32a577d 100644 --- a/src/subscriber.cpp +++ b/src/subscriber.cpp @@ -94,11 +94,11 @@ void debugReply(Command c) { cout << "------" << endl; } -void Subscriber::subscribeBase(const string cmd_name, const string topic, - function msg_callback, - function sub_callback, - function unsub_callback, - function err_callback) { +void Subscriber::subscribeBase(const string &cmd_name, const string &topic, + const function &msg_callback, + const function &sub_callback, + const function &unsub_callback, + const function &err_callback) { Command &sub_cmd = rdx_.commandLoop( {cmd_name, topic}, @@ -176,11 +176,11 @@ void Subscriber::subscribeBase(const string cmd_name, const string topic, num_pending_subs_++; } -void Subscriber::subscribe(const string topic, - function msg_callback, - function sub_callback, - function unsub_callback, - function err_callback) { +void Subscriber::subscribe(const string &topic, + const function &msg_callback, + const function &sub_callback, + const function &unsub_callback, + const function &err_callback) { lock_guard lg(subscribed_topics_guard_); if (subscribed_topics_.find(topic) != subscribed_topics_.end()) { logger_.warning() << "Already subscribed to " << topic << "!"; @@ -189,11 +189,11 @@ void Subscriber::subscribe(const string topic, subscribeBase("SUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback); } -void Subscriber::psubscribe(const string topic, - function msg_callback, - function sub_callback, - function unsub_callback, - function err_callback) { +void Subscriber::psubscribe(const string &topic, + const function &msg_callback, + const function &sub_callback, + const function &unsub_callback, + const function &err_callback) { lock_guard lg(psubscribed_topics_guard_); if (psubscribed_topics_.find(topic) != psubscribed_topics_.end()) { logger_.warning() << "Already psubscribed to " << topic << "!"; @@ -202,8 +202,8 @@ void Subscriber::psubscribe(const string topic, subscribeBase("PSUBSCRIBE", topic, msg_callback, sub_callback, unsub_callback, err_callback); } -void Subscriber::unsubscribeBase(const string cmd_name, const string topic, - function err_callback) { +void Subscriber::unsubscribeBase(const string &cmd_name, const string &topic, + const function &err_callback) { rdx_.command({cmd_name, topic}, [topic, err_callback](Command &c) { if (!c.ok()) { if (err_callback) @@ -213,7 +213,7 @@ void Subscriber::unsubscribeBase(const string cmd_name, const string topic, }); } -void Subscriber::unsubscribe(const string topic, function err_callback) { +void Subscriber::unsubscribe(const string &topic, const function &err_callback) { lock_guard lg(subscribed_topics_guard_); if (subscribed_topics_.find(topic) == subscribed_topics_.end()) { logger_.warning() << "Cannot unsubscribe from " << topic << ", not subscribed!"; @@ -222,8 +222,8 @@ void Subscriber::unsubscribe(const string topic, function err_callback) { +void Subscriber::punsubscribe(const string &topic, + const function &err_callback) { lock_guard lg(psubscribed_topics_guard_); if (psubscribed_topics_.find(topic) == psubscribed_topics_.end()) { logger_.warning() << "Cannot punsubscribe from " << topic << ", not psubscribed!"; diff --git a/test/test.cpp b/test/test.cpp index cbafca5..713b09f 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -122,6 +122,12 @@ class RedoxTest : public ::testing::Test { c.free(); } + template void print_and_check_sync(Command &c) { + ASSERT_TRUE(c.ok()); + cout << "[SYNC] " << c.cmd() << ": " << c.reply() << endl; + c.free(); + } + template void print_and_check_sync(Command &c, const ReplyT &value) { ASSERT_TRUE(c.ok()); EXPECT_EQ(c.reply(), value); @@ -132,10 +138,9 @@ class RedoxTest : public ::testing::Test { /** * Check the error */ - template void print_and_check_error_sync(Command &c, const ReplyT &value) { + template void print_and_check_error_sync(Command &c) { EXPECT_FALSE(c.ok()); EXPECT_FALSE(c.lastError().empty()); - // EXPECT_EQ(value, c.reply()); cout << c.cmd() << ": " << c.lastError() << endl; } }; @@ -236,10 +241,65 @@ TEST_F(RedoxTest, IncrSync) { TEST_F(RedoxTest, GetSetSyncError) { connect(); print_and_check_sync(rdx.commandSync({"SET", "redox_test:a", "apple"}), "OK"); - print_and_check_error_sync(rdx.commandSync({"GET", "redox_test:a"}), 3); + print_and_check_error_sync(rdx.commandSync({"GET", "redox_test:a"})); + rdx.disconnect(); +} + +// ------------------------------------------- +// Core unit tests - reply types +// ------------------------------------------- + +TEST_F(RedoxTest, MapString) { + connect(); + + print_and_check_sync(rdx.commandSync({"HSET", "mapstring", "a", "apple"})); + print_and_check_sync(rdx.commandSync({"HSET", "mapstring", "b", "beer"})); + + auto& cmd = rdx.commandSync>({"HGETALL", "mapstring"}); + ASSERT_TRUE(cmd.ok()); + + auto resultMap = cmd.reply(); + EXPECT_EQ(2, resultMap.size()); + EXPECT_EQ("apple", resultMap["a"]); + EXPECT_EQ("beer", resultMap["b"]); + + cmd.free(); rdx.disconnect(); } +TEST_F(RedoxTest, VectorMapString) { + connect(); + + // This does not return the full command info, because COMMAND returns reply objects that are 3 levels deep. + // However, for testing purposes, this suffices. + auto& cmd = rdx.commandSync>>({"COMMAND"}); + ASSERT_TRUE(cmd.ok()); + + auto resultVector = cmd.reply(); + cout << "resultVector.size(): " << resultVector.size() << endl; + + bool foundGetCommand = false; // test if we can at least find the get command as a key + EXPECT_FALSE(resultVector.empty()); + for (const auto& resultMap : resultVector) + { + EXPECT_FALSE(resultMap.empty()); + cout << "resultMap.size(): " << resultMap.size() << endl; + + if (resultMap.find("get") != resultMap.end()) + { + foundGetCommand = true; + } + } + EXPECT_TRUE(foundGetCommand); + + cmd.free(); + rdx.disconnect(); +} + +// ------------------------------------------- +// Multithreading tests +// ------------------------------------------- + TEST_F(RedoxTest, MultithreadedCRUD) { connect(); int create_count(0);