Skip to content

Commit

Permalink
Use gRPC naming scheme for name server address (#408)
Browse files Browse the repository at this point in the history
* Use gRPC naming scheme for name server address

* Pin bazel version to LTS 4.2.0
  • Loading branch information
lizhanhui authored Mar 10, 2022
1 parent ab3fdf9 commit 571c7b7
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 97 deletions.
1 change: 1 addition & 0 deletions .bazelversion
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
4.2.0
2 changes: 1 addition & 1 deletion example/rocketmq/ExampleProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ int main(int argc, char* argv[]) {
DefaultMQProducer producer("TestGroup");

const char* topic = "cpp_sdk_standard";
const char* name_server = "121.43.42.193:80";
const char* name_server = "mq-inst-1080056302921134-bxuibml7.mq.cn-hangzhou.aliyuncs.com:80";

producer.setNamesrvAddr(name_server);
producer.compressBodyThreshold(256);
Expand Down
3 changes: 2 additions & 1 deletion example/rocketmq/ExamplePushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ int main(int argc, char* argv[]) {
const char* group_id = "GID_cpp_sdk_standard";
const char* topic = "cpp_sdk_standard";
const char* resource_namespace = "MQ_INST_1080056302921134_BXuIbML7";
const char* name_server = "mq-inst-1080056302921134-bxuibml7.mq.cn-hangzhou.aliyuncs.com:80";

DefaultMQPushConsumer push_consumer(group_id);
push_consumer.setResourceNamespace(resource_namespace);
push_consumer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
push_consumer.setNamesrvAddr("121.43.42.193:80");
push_consumer.setNamesrvAddr(name_server);
MessageListener* listener = new SampleMQMessageListener;
push_consumer.setInstanceName("instance_0");
push_consumer.subscribe(topic, "*");
Expand Down
3 changes: 2 additions & 1 deletion src/main/cpp/client/ClientManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) {
void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metadata& metadata,
const QueryRouteRequest& request, std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) {

SPDLOG_DEBUG("Name server connection URL: {}", target_host);
SPDLOG_DEBUG("Query route request: {}", request.DebugString());
RpcClientSharedPtr client = getRpcClient(target_host, false);
if (!client) {
SPDLOG_WARN("Failed to create RPC client for name server[host={}]", target_host);
Expand Down
32 changes: 21 additions & 11 deletions src/main/cpp/rocketmq/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "InvocationContext.h"
#include "LoggerImpl.h"
#include "MessageAccessor.h"
#include "NamingScheme.h"
#include "Signature.h"
#include "rocketmq/MQMessageExt.h"
#include "rocketmq/MessageListener.h"
Expand Down Expand Up @@ -160,7 +161,24 @@ void ClientImpl::getRouteFor(const std::string& topic,
void ClientImpl::setAccessPoint(rmq::Endpoints* endpoints) {
std::vector<std::pair<std::string, std::uint16_t>> pairs;
{
std::vector<std::string> name_server_list = name_server_resolver_->resolve();
std::string naming_address = name_server_resolver_->resolve();
absl::string_view host_port_csv;

if (absl::StartsWith(naming_address, NamingScheme::DnsPrefix)) {
endpoints->set_scheme(rmq::AddressScheme::DOMAIN_NAME);
host_port_csv = absl::StripPrefix(naming_address, NamingScheme::DnsPrefix);
} else if (absl::StartsWith(naming_address, NamingScheme::IPv4Prefix)) {
endpoints->set_scheme(rmq::AddressScheme::IPv4);
host_port_csv = absl::StripPrefix(naming_address, NamingScheme::IPv4Prefix);
} else if (absl::StartsWith(naming_address, NamingScheme::IPv6Prefix)) {
endpoints->set_scheme(rmq::AddressScheme::IPv6);
host_port_csv = absl::StripPrefix(naming_address, NamingScheme::IPv6Prefix);
} else {
SPDLOG_WARN("Unsupported naming scheme");
}

std::vector<std::string> name_server_list = absl::StrSplit(host_port_csv, ',');

for (const auto& name_server_item : name_server_list) {
std::string::size_type pos = name_server_item.rfind(':');
if (std::string::npos == pos) {
Expand All @@ -179,20 +197,12 @@ void ClientImpl::setAccessPoint(rmq::Endpoints* endpoints) {
address->set_host(host_port.first);
endpoints->mutable_addresses()->AddAllocated(address);
}

if (MixAll::isIPv4(pairs.begin()->first)) {
endpoints->set_scheme(rmq::AddressScheme::IPv4);
} else if (absl::StrContains(pairs.begin()->first, ':')) {
endpoints->set_scheme(rmq::AddressScheme::IPv6);
} else {
endpoints->set_scheme(rmq::AddressScheme::DOMAIN_NAME);
}
}
}

void ClientImpl::fetchRouteFor(const std::string& topic,
const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) {
std::string name_server = name_server_resolver_->current();
std::string name_server = name_server_resolver_->resolve();
if (name_server.empty()) {
SPDLOG_WARN("No name server available");
return;
Expand All @@ -201,7 +211,7 @@ void ClientImpl::fetchRouteFor(const std::string& topic,
auto callback = [this, topic, name_server, cb](const std::error_code& ec, const TopicRouteDataPtr& route) {
if (ec) {
SPDLOG_WARN("Failed to resolve route for topic={} from {}", topic, name_server);
std::string name_server_changed = name_server_resolver_->next();
std::string name_server_changed = name_server_resolver_->resolve();
if (!name_server_changed.empty()) {
SPDLOG_INFO("Change current name server from {} to {}", name_server, name_server_changed);
}
Expand Down
21 changes: 3 additions & 18 deletions src/main/cpp/rocketmq/DynamicNameServerResolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
#include <memory>

#include "absl/strings/str_join.h"
#include "spdlog/spdlog.h"

#include "LoggerImpl.h"
#include "SchedulerImpl.h"

ROCKETMQ_NAMESPACE_BEGIN
Expand Down Expand Up @@ -69,7 +69,7 @@ DynamicNameServerResolver::DynamicNameServerResolver(absl::string_view endpoint,
std::string(remains.data(), remains.length()));
}

std::vector<std::string> DynamicNameServerResolver::resolve() {
std::string DynamicNameServerResolver::resolve() {
bool fetch_immediately = false;
{
absl::MutexLock lk(&name_server_list_mtx_);
Expand All @@ -84,7 +84,7 @@ std::vector<std::string> DynamicNameServerResolver::resolve() {

{
absl::MutexLock lk(&name_server_list_mtx_);
return name_server_list_;
return naming_scheme_.buildAddress(name_server_list_);
}
}

Expand Down Expand Up @@ -126,19 +126,4 @@ void DynamicNameServerResolver::shutdown() {
scheduler_->shutdown();
}

std::string DynamicNameServerResolver::current() {
absl::MutexLock lk(&name_server_list_mtx_);
if (name_server_list_.empty()) {
return std::string();
}

std::uint32_t index = index_.load(std::memory_order_relaxed) % name_server_list_.size();
return name_server_list_[index];
}

std::string DynamicNameServerResolver::next() {
index_.fetch_add(1, std::memory_order_relaxed);
return current();
}

ROCKETMQ_NAMESPACE_END
81 changes: 81 additions & 0 deletions src/main/cpp/rocketmq/NamingScheme.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "NamingScheme.h"

#include <cstdint>

#include "absl/container/flat_hash_map.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"

ROCKETMQ_NAMESPACE_BEGIN

const char* NamingScheme::DnsPrefix = "dns:";
const char* NamingScheme::IPv4Prefix = "ipv4:";
const char* NamingScheme::IPv6Prefix = "ipv6:";

const char* NamingScheme::IPv4Regex =
"(([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])";

const char* NamingScheme::IPv6Regex = "((([0-9a-fA-F]){1,4})\\:){7}([0-9a-fA-F]){1,4}";

NamingScheme::NamingScheme() : ipv4_pattern_(IPv4Regex), ipv6_pattern_(IPv6Regex) {
}

std::string NamingScheme::buildAddress(const std::vector<std::string>& list) {
absl::flat_hash_map<std::string, std::uint32_t> ipv4;
absl::flat_hash_map<std::string, std::uint32_t> ipv6;

for (const auto& segment : list) {
std::vector<std::string> host_port = absl::StrSplit(segment, ':');
if (2 != host_port.size()) {
continue;
}

if (re2::RE2::FullMatch(host_port[0], ipv4_pattern_)) {
std::uint32_t port;
if (absl::SimpleAtoi(host_port[1], &port)) {
ipv4.insert_or_assign(host_port[0], port);
}
continue;
}

if (re2::RE2::FullMatch(host_port[0], ipv6_pattern_)) {
std::uint32_t port;
if (absl::SimpleAtoi(host_port[1], &port)) {
ipv6.insert_or_assign(host_port[0], port);
}
continue;
}

// Once we find a domain name record, use it as the final result.
host_port.insert(host_port.begin(), "dns");
return absl::StrJoin(host_port, ":");
}

if (!ipv4.empty()) {
return "ipv4:" + absl::StrJoin(ipv4, ",", absl::PairFormatter(":"));
}

if (!ipv6.empty()) {
return "ipv6:" + absl::StrJoin(ipv4, ",", absl::PairFormatter(":"));
}
return std::string();
}

ROCKETMQ_NAMESPACE_END
27 changes: 11 additions & 16 deletions src/main/cpp/rocketmq/StaticNameServerResolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,22 @@
#include "StaticNameServerResolver.h"

#include "absl/strings/str_split.h"
#include <atomic>
#include <cstdint>

ROCKETMQ_NAMESPACE_BEGIN

StaticNameServerResolver::StaticNameServerResolver(absl::string_view name_server_list)
: name_server_list_(absl::StrSplit(name_server_list, ';')) {
}
#include "LoggerImpl.h"

std::string StaticNameServerResolver::current() {
std::uint32_t index = index_.load(std::memory_order_relaxed) % name_server_list_.size();
return name_server_list_[index];
}
ROCKETMQ_NAMESPACE_BEGIN

std::string StaticNameServerResolver::next() {
index_.fetch_add(1, std::memory_order_relaxed);
return current();
StaticNameServerResolver::StaticNameServerResolver(absl::string_view name_server_list) {
std::vector<std::string> segments = absl::StrSplit(name_server_list, ';');
name_server_address_ = naming_scheme_.buildAddress(segments);
if (name_server_address_.empty()) {
SPDLOG_WARN("Failed to create gRPC naming scheme compliant address from {}",
std::string(name_server_list.data(), name_server_list.length()));
}
}

std::vector<std::string> StaticNameServerResolver::resolve() {
return name_server_list_;
std::string StaticNameServerResolver::resolve() {
return name_server_address_;
}

ROCKETMQ_NAMESPACE_END
9 changes: 4 additions & 5 deletions src/main/cpp/rocketmq/include/DynamicNameServerResolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "absl/synchronization/mutex.h"

#include "NameServerResolver.h"
#include "NamingScheme.h"
#include "Scheduler.h"
#include "TopAddressing.h"

Expand All @@ -44,11 +45,7 @@ class DynamicNameServerResolver : public NameServerResolver,

void shutdown() override;

std::string current() override LOCKS_EXCLUDED(name_server_list_mtx_);

std::string next() override LOCKS_EXCLUDED(name_server_list_mtx_);

std::vector<std::string> resolve() override LOCKS_EXCLUDED(name_server_list_mtx_);
std::string resolve() override LOCKS_EXCLUDED(name_server_list_mtx_);

void injectHttpClient(std::unique_ptr<HttpClient> http_client);

Expand All @@ -69,6 +66,8 @@ class DynamicNameServerResolver : public NameServerResolver,

bool ssl_{false};
std::unique_ptr<TopAddressing> top_addressing_;

NamingScheme naming_scheme_;
};

ROCKETMQ_NAMESPACE_END
6 changes: 1 addition & 5 deletions src/main/cpp/rocketmq/include/NameServerResolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ class NameServerResolver {

virtual void shutdown() = 0;

virtual std::string next() = 0;

virtual std::string current() = 0;

virtual std::vector<std::string> resolve() = 0;
virtual std::string resolve() = 0;
};

ROCKETMQ_NAMESPACE_END
46 changes: 46 additions & 0 deletions src/main/cpp/rocketmq/include/NamingScheme.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string>
#include <vector>

#include "re2/re2.h"

#include "rocketmq/RocketMQ.h"

ROCKETMQ_NAMESPACE_BEGIN

class NamingScheme {
public:
NamingScheme();

std::string buildAddress(const std::vector<std::string>& list);

static const char* DnsPrefix;
static const char* IPv4Prefix;
static const char* IPv6Prefix;

private:
static const char* IPv4Regex;
static const char* IPv6Regex;

re2::RE2 ipv4_pattern_;
re2::RE2 ipv6_pattern_;
};

ROCKETMQ_NAMESPACE_END
Loading

0 comments on commit 571c7b7

Please sign in to comment.