Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ build/
# IntelliJ
.idea/

# Qt-Creator (assuming a project called "redox")
redox.config
redox.creaor
redox.files
redox.includes
redox.creator.user

# Todo file
TODO.md

Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ if (examples)

add_executable(binary_data_publish examples/binary_data_publish.cpp)
target_link_libraries(binary_data_publish redox)

add_executable(pub_sub examples/pub_sub.cpp)
target_link_libraries(pub_sub redox)

Expand Down Expand Up @@ -204,7 +204,7 @@ set(CPACK_PACKAGE_DESCRIPTION_FILE "${CMAKE_CURRENT_SOURCE_DIR}/README.md")
set(CPACK_PACKAGE_VERSION_MAJOR "${REDOX_VERSION_MAJOR}")
set(CPACK_PACKAGE_VERSION_MINOR "${REDOX_VERSION_MINOR}")
set(CPACK_PACKAGE_VERSION_PATCH "${REDOX_VERSION_PATCH}")
set(CPACK_PACKAGE_VERSION_RELEASE "1") # Increase this if a failed build was published
set(CPACK_PACKAGE_VERSION_RELEASE "1.1") # Increase this if a failed build was published
set(CPACK_PACKAGE_VERSION "${CPACK_PACKAGE_VERSION_MAJOR}.${CPACK_PACKAGE_VERSION_MINOR}.${CPACK_PACKAGE_VERSION_PATCH}-${CPACK_PACKAGE_VERSION_RELEASE}")
set(CPACK_SYSTEM_NAME "${ARCHITECTURE}")
set(CPACK_PACKAGE_FILE_NAME "${PROJECT_NAME}-${CPACK_PACKAGE_VERSION}.${ARCHITECTURE}")
Expand Down
7 changes: 5 additions & 2 deletions include/redox/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <string>
#include <queue>
#include <set>
#include <map>
#include <unordered_map>
#include <unordered_set>

Expand Down Expand Up @@ -372,6 +373,8 @@ class Redox {
std::unordered_map<long, Command<long long int> *> commands_long_long_int_;
std::unordered_map<long, Command<std::nullptr_t> *> commands_null_;
std::unordered_map<long, Command<std::vector<std::string>> *> commands_vector_string_;
std::unordered_map<long, Command<std::map<std::string,std::string>> *> commands_map_string_;
std::unordered_map<long, Command<std::vector<std::map<std::string,std::string>>> *> commands_vectormap_string_;
std::unordered_map<long, Command<std::set<std::string>> *> commands_set_string_;
std::unordered_map<long, Command<std::unordered_set<std::string>> *>
commands_unordered_set_string_;
Expand All @@ -390,7 +393,7 @@ class Redox {
template <class ReplyT> friend void Command<ReplyT>::free();

// Access to call disconnectedCallback
template <class ReplyT> friend void Command<ReplyT>::processReply(redisReply *r);
template <class ReplyT> friend void Command<ReplyT>::processReply(redisReply *r, const std::string& errorMessage);
};

// ------------------------------------------------
Expand All @@ -408,7 +411,7 @@ Command<ReplyT> &Redox::createCommand(const std::vector<std::string> &cmd,
}
}

auto *c = new Command<ReplyT>(this, commands_created_.fetch_add(1), cmd,
auto *c = new Command<ReplyT>(this, commands_created_.fetch_add(1), cmd,
callback, repeat, after, free_memory, logger_);

std::lock_guard<std::mutex> lg(queue_guard_);
Expand Down
2 changes: 1 addition & 1 deletion include/redox/command.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ template <class ReplyT> class Command {
bool free_memory, log::Logger &logger);

// Handles a new reply from the server
void processReply(redisReply *r);
void processReply(redisReply *r, const std::string& errorMessage = std::string());

// Invoke a user callback from the reply object. This method is specialized
// for each ReplyT of Command.
Expand Down
46 changes: 23 additions & 23 deletions include/redox/subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ class Subscriber {
* Same as .connect() on a Redox instance.
*/
bool connect(const std::string &host = REDIS_DEFAULT_HOST, const int port = REDIS_DEFAULT_PORT,
std::function<void(int)> connection_callback = nullptr) {
const std::function<void(int)> &connection_callback = nullptr) {
return rdx_.connect(host, port, connection_callback);
}

/**
* Same as .connectUnix() on a Redox instance.
*/
bool connectUnix(const std::string &path = REDIS_DEFAULT_PATH,
std::function<void(int)> connection_callback = nullptr) {
const std::function<void(int)> &connection_callback = nullptr) {
return rdx_.connectUnix(path, connection_callback);
}

Expand All @@ -80,11 +80,11 @@ class Subscriber {
* sub_callback: invoked when successfully subscribed
* err_callback: invoked on some error state
*/
void subscribe(const std::string topic,
std::function<void(const std::string &, const std::string &)> msg_callback,
std::function<void(const std::string &)> sub_callback = nullptr,
std::function<void(const std::string &)> unsub_callback = nullptr,
std::function<void(const std::string &, int)> err_callback = nullptr);
void subscribe(const std::string& topic,
const std::function<void(const std::string &, const std::string &)> &msg_callback,
const std::function<void(const std::string &)> &sub_callback = nullptr,
const std::function<void(const std::string &)> &unsub_callback = nullptr,
const std::function<void(const std::string &, int)> &err_callback = nullptr);

/**
* Subscribe to a topic with a pattern.
Expand All @@ -93,27 +93,27 @@ class Subscriber {
* sub_callback: invoked when successfully subscribed
* err_callback: invoked on some error state
*/
void psubscribe(const std::string topic,
std::function<void(const std::string &, const std::string &)> msg_callback,
std::function<void(const std::string &)> sub_callback = nullptr,
std::function<void(const std::string &)> unsub_callback = nullptr,
std::function<void(const std::string &, int)> err_callback = nullptr);
void psubscribe(const std::string& topic,
const std::function<void(const std::string &, const std::string &)> &msg_callback,
const std::function<void(const std::string &)> &sub_callback = nullptr,
const std::function<void(const std::string &)> &unsub_callback = nullptr,
const std::function<void(const std::string &, int)> &err_callback = nullptr);

/**
* Unsubscribe from a topic.
*
* err_callback: invoked on some error state
*/
void unsubscribe(const std::string topic,
std::function<void(const std::string &, int)> err_callback = nullptr);
void unsubscribe(const std::string& topic,
const std::function<void(const std::string &, int)> &err_callback = nullptr);

/**
* Unsubscribe from a topic with a pattern.
*
* err_callback: invoked on some error state
*/
void punsubscribe(const std::string topic,
std::function<void(const std::string &, int)> err_callback = nullptr);
void punsubscribe(const std::string& topic,
const std::function<void(const std::string &, int)> &err_callback = nullptr);

/**
* Return the topics that are subscribed() to.
Expand All @@ -133,15 +133,15 @@ class Subscriber {

private:
// Base for subscribe and psubscribe
void subscribeBase(const std::string cmd_name, const std::string topic,
std::function<void(const std::string &, const std::string &)> msg_callback,
std::function<void(const std::string &)> sub_callback = nullptr,
std::function<void(const std::string &)> unsub_callback = nullptr,
std::function<void(const std::string &, int)> err_callback = nullptr);
void subscribeBase(const std::string &cmd_name, const std::string &topic,
const std::function<void(const std::string &, const std::string &)> &msg_callback,
const std::function<void(const std::string &)> &sub_callback = nullptr,
const std::function<void(const std::string &)> &unsub_callback = nullptr,
const std::function<void(const std::string &, int)> &err_callback = nullptr);

// Base for unsubscribe and punsubscribe
void unsubscribeBase(const std::string cmd_name, const std::string topic,
std::function<void(const std::string &, int)> err_callback = nullptr);
void unsubscribeBase(const std::string &cmd_name, const std::string &topic,
const std::function<void(const std::string &, int)> &err_callback = nullptr);

// Underlying Redis client
Redox rdx_;
Expand Down
51 changes: 39 additions & 12 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,17 +365,27 @@ template <class ReplyT> Command<ReplyT> *Redox::findCommand(long id) {
template <class ReplyT>
void Redox::commandCallback(redisAsyncContext *ctx, void *r, void *privdata) {

Redox *rdx = (Redox *)ctx->data;
long id = (long)privdata;
redisReply *reply_obj = (redisReply *)r;
Redox *rdx = reinterpret_cast<Redox*>(ctx->data);
if (rdx == nullptr) {
return;
}

redisReply *reply_obj = reinterpret_cast<redisReply*>(r);

long id = (long)privdata;
Command<ReplyT> *c = rdx->findCommand<ReplyT>(id);
if (c == nullptr) {
freeReplyObject(reply_obj);
if (reply_obj != nullptr) {
freeReplyObject(reply_obj);
}
return;
}

c->processReply(reply_obj);
if (ctx->err) {
c->processReply(nullptr, std::string(ctx->errstr));
} else {
c->processReply(reply_obj);
}
}

template <class ReplyT> bool Redox::submitToServer(Command<ReplyT> *c) {
Expand Down Expand Up @@ -460,6 +470,8 @@ void Redox::processQueuedCommands(struct ev_loop *loop, ev_async *async, int rev
} else if (rdx->processQueuedCommand<nullptr_t>(id)) {
} else if (rdx->processQueuedCommand<vector<string>>(id)) {
} else if (rdx->processQueuedCommand<std::set<string>>(id)) {
} else if (rdx->processQueuedCommand<map<string,string>>(id)) {
} else if (rdx->processQueuedCommand<vector<map<string,string>>>(id)) {
} else if (rdx->processQueuedCommand<unordered_set<string>>(id)) {
} else
throw runtime_error("Command pointer not found in any queue!");
Expand All @@ -484,6 +496,8 @@ void Redox::freeQueuedCommands(struct ev_loop *loop, ev_async *async, int revent
} else if (rdx->freeQueuedCommand<nullptr_t>(id)) {
} else if (rdx->freeQueuedCommand<vector<string>>(id)) {
} else if (rdx->freeQueuedCommand<std::set<string>>(id)) {
} else if (rdx->freeQueuedCommand<map<string,string>>(id)) {
} else if (rdx->freeQueuedCommand<vector<map<string,string>>>(id)) {
} else if (rdx->freeQueuedCommand<unordered_set<string>>(id)) {
} else {
}
Expand Down Expand Up @@ -511,10 +525,16 @@ template <class ReplyT> bool Redox::freeQueuedCommand(long id) {
}

long Redox::freeAllCommands() {
return freeAllCommandsOfType<redisReply *>() + freeAllCommandsOfType<string>() +
freeAllCommandsOfType<char *>() + freeAllCommandsOfType<int>() +
freeAllCommandsOfType<long long int>() + freeAllCommandsOfType<nullptr_t>() +
freeAllCommandsOfType<vector<string>>() + freeAllCommandsOfType<std::set<string>>() +
return freeAllCommandsOfType<redisReply *>() +
freeAllCommandsOfType<string>() +
freeAllCommandsOfType<char *>() +
freeAllCommandsOfType<int>() +
freeAllCommandsOfType<long long int>() +
freeAllCommandsOfType<nullptr_t>() +
freeAllCommandsOfType<vector<string>>() +
freeAllCommandsOfType<std::set<string>>() +
freeAllCommandsOfType<map<string,string>>() +
freeAllCommandsOfType<vector<map<string,string>>>() +
freeAllCommandsOfType<unordered_set<string>>();
}

Expand Down Expand Up @@ -579,12 +599,19 @@ template <> unordered_map<long, Command<vector<string>> *> &Redox::getCommandMap
return commands_vector_string_;
}

template <> unordered_map<long, Command<set<string>> *> &Redox::getCommandMap<set<string>>() {
template <> unordered_map<long, Command<map<string,string>> *> &Redox::getCommandMap<map<string,string>>() {
return commands_map_string_;
}

template <> unordered_map<long, Command<vector<map<string,string>>> *> &Redox::getCommandMap<vector<map<string,string>>>() {
return commands_vectormap_string_;
}

template <> unordered_map<long, Command<std::set<string>> *> &Redox::getCommandMap<std::set<string>>() {
return commands_set_string_;
}

template <>
unordered_map<long, Command<unordered_set<string>> *> &
template <> unordered_map<long, Command<unordered_set<string>> *> &
Redox::getCommandMap<unordered_set<string>>() {
return commands_unordered_set_string_;
}
Expand Down
63 changes: 61 additions & 2 deletions src/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,44 @@

#include <vector>
#include <set>
#include <map>
#include <unordered_set>
#include <utility>

#include "command.hpp"
#include "client.hpp"

using namespace std;

namespace {

// Helper function to process replies that should convert into a map
void parseReplyObject(redisReply* reply_obj, map<string,string>& reply_val) {

if (reply_obj->type != REDIS_REPLY_ARRAY) {
return;
}

for (size_t i = 0; i < reply_obj->elements; i = i + 2) {
redisReply *r = *(reply_obj->element + i);
size_t next = i + 1;
if (next < reply_obj->elements) {
redisReply *rnext = *(reply_obj->element + next);
reply_val.emplace(
std::piecewise_construct,
std::forward_as_tuple(r->str, r->len),
std::forward_as_tuple(rnext->str, rnext->len));
} else {
reply_val.emplace(
std::piecewise_construct,
std::forward_as_tuple(r->str, r->len),
std::forward_as_tuple());
}
}
}

}

namespace redox {

template <class ReplyT>
Expand All @@ -44,14 +75,19 @@ template <class ReplyT> void Command<ReplyT>::wait() {
waiting_done_ = {false};
}

template <class ReplyT> void Command<ReplyT>::processReply(redisReply *r) {
template <class ReplyT> void Command<ReplyT>::processReply(redisReply *r, const std::string& errorMessage) {

last_error_.clear();
reply_obj_ = r;

if (reply_obj_ == nullptr) {
reply_status_ = ERROR_REPLY;
last_error_ = "Received null redisReply* from hiredis.";
if (errorMessage.empty()) {
last_error_ = "Received null redisReply* from hiredis.";
}
else {
last_error_ = errorMessage;
}
logger_.error() << last_error_;
Redox::disconnectedCallback(rdx_->ctx_, REDIS_ERR);

Expand Down Expand Up @@ -236,6 +272,27 @@ template <> void Command<vector<string>>::parseReplyObject() {
}
}

template <> void Command<map<string,string>>::parseReplyObject() {

if (!isExpectedReply(REDIS_REPLY_ARRAY))
return;

::parseReplyObject(reply_obj_, reply_val_);
}

template <> void Command<vector<map<string,string>>>::parseReplyObject() {

if (!isExpectedReply(REDIS_REPLY_ARRAY))
return;

for (size_t i = 0; i < reply_obj_->elements; i++) {
redisReply *r = *(reply_obj_->element + i);
map<string,string> reply_val;
::parseReplyObject(r, reply_val);
reply_val_.push_back(reply_val);
}
}

template <> void Command<unordered_set<string>>::parseReplyObject() {

if (!isExpectedReply(REDIS_REPLY_ARRAY))
Expand Down Expand Up @@ -269,6 +326,8 @@ template class Command<long long int>;
template class Command<nullptr_t>;
template class Command<vector<string>>;
template class Command<set<string>>;
template class Command<map<string,string>>;
template class Command<vector<map<string,string>>>;
template class Command<unordered_set<string>>;

} // End namespace redox
Loading