Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/qpid/dispatch/amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,12 @@ extern const char * const QD_CAPABILITY_ROUTER_DATA;
extern const char * const QD_CAPABILITY_EDGE_DOWNLINK;
extern const char * const QD_CAPABILITY_STREAMING_DELIVERIES;
extern const char * const QD_CAPABILITY_RESEND_RELEASED;
extern const char * const QD_CAPABILITY_CROSS_NETWORK;
/// @}

/** @name Dynamic Node Properties */
/// @{
extern const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS; ///< Address for routing dynamic sources
extern const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS; ///< Address for routing dynamic sources
/// @}

/** @name Connection Properties */
Expand Down
6 changes: 6 additions & 0 deletions include/qpid/dispatch/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ typedef struct qd_iterator_t qd_iterator_t;
*/
#define QD_ITER_HASH_PREFIX_TOPOLOGICAL 'T'
#define QD_ITER_HASH_PREFIX_LOCAL 'L'
#define QD_ITER_HASH_PREFIX_NETWORK 'N'
#define QD_ITER_HASH_PREFIX_AREA 'A'
#define QD_ITER_HASH_PREFIX_ROUTER 'R'
#define QD_ITER_HASH_PREFIX_MOBILE 'M'
Expand Down Expand Up @@ -137,6 +138,11 @@ void qd_iterator_finalize(void);
*/
void qd_iterator_set_address(bool edge_mode, const char *area, const char *router);

/**
* Set the network ID for the local router. This can be updated repeatedly during run-time.
*/
void qd_iterator_set_network(const char *network);

/**
* Add and delete peer-edge router identities. When in edge mode, peer edge routers
* result in different hash results than remote edge routers. These functions are used
Expand Down
13 changes: 9 additions & 4 deletions include/qpid/dispatch/router_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ ENUM_DECLARE(qd_router_mode);
/**
* Allocate and start an instance of the router core module.
*/
qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id, const char *van_id);
qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id, const char *tenant_id);

/**
* Set the network id
*/
void qdr_core_set_network_id(qdr_core_t *core, const char *network_id);

/**
* Stop and deallocate an instance of the router core.
Expand All @@ -75,12 +80,12 @@ qd_dispatch_t *qdr_core_dispatch(qdr_core_t *core);
void qdr_process_tick(qdr_core_t *core);

/**
* @brief Return the text of the router's virtual application network ID, or 0.
* @brief Return the text of the router's virtual application network tenant ID, or 0.
*
* @param core Pointer to the core object returned by qdr_core()
* @return const char* null-terminated text of the van-id or 0 if there's no id.
* @return const char* null-terminated text of the tenant-id or 0 if there's no id.
*/
const char *qdr_core_van_id(const qdr_core_t *core);
const char *qdr_core_tenant_id(const qdr_core_t *core);


/**
Expand Down
16 changes: 12 additions & 4 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@
"create": true
},
"vanId": {
"description":"Deprecated: See the managedRouter entity definition.",
"description":"Deprecated: See the managedRouter tenantId attribute.",
"type": "string",
"required": false,
"create": true
Expand Down Expand Up @@ -637,9 +637,17 @@
"description": "This optional entity holds configuration needed for the router to be managed by a central manager",
"extends": "configurationEntity",
"singleton": true,
"operations": ["UPDATE"],
"attributes": {
"vanId": {
"description":"The unique ID of the Virtual Application Network that this router is a member of.",
"networkId": {
"description":"The unique ID of the AMQP Network that this router is a member of.",
"type": "string",
"required": false,
"create": true,
"update": true
},
"tenantId": {
"description":"The unique VAN tenant ID for routers that are connected to a multi-tenant service backbone.",
"type": "string",
"required": false,
"create": true
Expand All @@ -648,7 +656,7 @@
},

"site": {
"description":"Optional. If present, this entity causes the router to emit a SITE record for VanFLow in the case that there is not an external controller to emit this record.",
"description":"Optional. If present, this entity causes the router to emit a SITE record for VanFlow in the case that there is not an external controller to emit this record.",
"extends": "configurationEntity",
"attributes": {
"location": {
Expand Down
1 change: 1 addition & 0 deletions python/skupper_router_internal/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self) -> None:
self._prototype(self.qd_error_message, c_char_p, [], check=False)
self._prototype(self.qd_log_entity, c_long, [py_object])
self._prototype(self.qd_dispatch_configure_managed_router, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_update_managed_router, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_router, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_configure_site, None, [self.qd_dispatch_p, py_object])
self._prototype(self.qd_dispatch_prepare, None, [self.qd_dispatch_p])
Expand Down
3 changes: 3 additions & 0 deletions python/skupper_router_internal/management/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ def __init__(self, agent, entity_type, attributes=None):
def create(self):
self._qd.qd_dispatch_configure_managed_router(self._dispatch, self)

def _update(self):
self._qd.qd_dispatch_update_managed_router(self._dispatch, self)

def __str__(self):
return super(ManagedRouterEntity, self).__str__().replace("Entity(", "ManagedRouterEntity(")

Expand Down
8 changes: 4 additions & 4 deletions src/adaptors/adaptor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@
//
// If this router is annotated with a van-id, add the van-id to the address
//
const char *van_id = qdr_core_van_id(core);
if (!!van_id) {
char *address = (char*) malloc(strlen(config_address) + strlen(van_id) + 2);
strcpy(address, van_id);
const char *tenant_id = qdr_core_tenant_id(core);
if (!!tenant_id) {
char *address = (char*) malloc(strlen(config_address) + strlen(tenant_id) + 2);
strcpy(address, tenant_id);
strcat(address, "/");
strcat(address, config_address);
config->address = address;
Expand Down
1 change: 1 addition & 0 deletions src/amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const char * const QD_CAPABILITY_ROUTER_DATA = "qd.router-data";
const char * const QD_CAPABILITY_EDGE_DOWNLINK = "qd.router-edge-downlink";
const char * const QD_CAPABILITY_STREAMING_DELIVERIES = "qd.streaming-deliveries";
const char * const QD_CAPABILITY_RESEND_RELEASED = "qd.resend-released";
const char * const QD_CAPABILITY_CROSS_NETWORK = "qd.cross-network";
const char * const QD_CAPABILITY_ANONYMOUS_RELAY = "ANONYMOUS-RELAY";
const char * const QD_CAPABILITY_STREAMING_LINKS = "qd.streaming-links";
const char * const QD_CAPABILITY_INTER_EDGE = "qd.router-inter-edge";
Expand Down
49 changes: 36 additions & 13 deletions src/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ void qd_router_free(qd_router_t *router);
void qd_error_initialize(void);
static void qd_dispatch_set_router_id(qd_dispatch_t *qd, char *_id);
static void qd_dispatch_set_router_area(qd_dispatch_t *qd, char *_area);
static void qd_dispatch_set_router_van_id(qd_dispatch_t *qd, char *_van_id);
static void qd_dispatch_set_router_tenant_id(qd_dispatch_t *qd, char *_tenant_id);
static void qd_dispatch_set_router_network_id(qd_dispatch_t *qd, char *_network_id);
static void qd_dispatch_policy_c_counts_free(PyObject *capsule);

const char *CLOSEST_DISTRIBUTION = "closest";
Expand Down Expand Up @@ -221,10 +222,23 @@ static void qd_dispatch_set_router_default_distribution(qd_dispatch_t *qd, char

qd_error_t qd_dispatch_configure_managed_router(qd_dispatch_t *qd, qd_entity_t *entity)
{
char *van_id = qd_entity_opt_string(entity, "vanId", 0); QD_ERROR_RET();
if (van_id) {
qd_dispatch_set_router_van_id(qd, van_id);
char *tenant_id = qd_entity_opt_string(entity, "tenantId", 0); QD_ERROR_RET();
if (tenant_id) {
qd_dispatch_set_router_tenant_id(qd, tenant_id);
}

char *network_id = qd_entity_opt_string(entity, "networkId", 0); QD_ERROR_RET();
qd_dispatch_set_router_network_id(qd, network_id);

return QD_ERROR_NONE;
}

qd_error_t qd_dispatch_update_managed_router(qd_dispatch_t *qd, qd_entity_t *entity)
{
char *network_id = qd_entity_opt_string(entity, "networkId", 0); QD_ERROR_RET();
qd_dispatch_set_router_network_id(qd, network_id);
qdr_core_set_network_id(qd->router->router_core, network_id);

return QD_ERROR_NONE;
}

Expand All @@ -234,8 +248,8 @@ qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity)
qd_dispatch_set_router_id(qd, qd_entity_opt_string(entity, "id", 0)); QD_ERROR_RET();
char *van_id = qd_entity_opt_string(entity, "vanId", 0); QD_ERROR_RET();
if (van_id) {
qd_dispatch_set_router_van_id(qd, van_id);
qd_log(LOG_ROUTER, QD_LOG_WARNING, "The vanId attribute is deprecated in the router entity. Please move it to managedRouter.");
qd_dispatch_set_router_tenant_id(qd, van_id);
qd_log(LOG_ROUTER, QD_LOG_WARNING, "The vanId attribute is deprecated in the router entity. Please use tenantId in managedRouter.");
}
qd->router_mode = qd_entity_get_long(entity, "mode"); QD_ERROR_RET();
if (!qd->router_id) {
Expand Down Expand Up @@ -397,12 +411,20 @@ static void qd_dispatch_set_router_area(qd_dispatch_t *qd, char *_area) {
qd->router_area = _area;
}

// Takes ownership of _van_id
static void qd_dispatch_set_router_van_id(qd_dispatch_t *qd, char *_van_id) {
if (qd->van_id) {
free(qd->van_id);
// Takes ownership of _tenant_id
static void qd_dispatch_set_router_tenant_id(qd_dispatch_t *qd, char *_tenant_id) {
if (qd->tenant_id) {
free(qd->tenant_id);
}
qd->tenant_id = _tenant_id;
}

// Takes ownership of _network_id
static void qd_dispatch_set_router_network_id(qd_dispatch_t *qd, char *_network_id) {
if (qd->network_id) {
free(qd->network_id);
}
qd->van_id = _van_id;
qd->network_id = _network_id;
}

void qd_dispatch_free(qd_dispatch_t *qd)
Expand All @@ -425,7 +447,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
qd_python_finalize();
qd_dispatch_set_router_id(qd, NULL);
qd_dispatch_set_router_area(qd, NULL);
qd_dispatch_set_router_van_id(qd, NULL);
qd_dispatch_set_router_tenant_id(qd, NULL);
qd_iterator_finalize();
free(qd->timestamp_format);
free(qd->metadata);
Expand Down Expand Up @@ -454,7 +476,8 @@ qd_connection_manager_t *qd_dispatch_connection_manager(const qd_dispatch_t *qd)
QD_EXPORT void qd_router_setup_late(qd_dispatch_t *qd)
{
qd->router->tracemask = qd_tracemask();
qd->router->router_core = qdr_core(qd, qd->router->router_mode, qd->router->router_area, qd->router->router_id, qd->router->van_id);
qd->router->router_core = qdr_core(qd, qd->router->router_mode, qd->router->router_area, qd->router->router_id, qd->router->tenant_id);
qdr_core_set_network_id(qd->router->router_core, qd->network_id);
qd_router_python_setup(qd->router);
qd_timer_schedule(qd->router->timer, 1000);
}
Expand Down
3 changes: 2 additions & 1 deletion src/dispatch_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ struct qd_dispatch_t {
char *sasl_config_name;
char *router_area;
char *router_id;
char *van_id;
char *network_id;
char *tenant_id;
char *timestamp_format;
char *metadata;
bool timestamps_in_utc;
Expand Down
67 changes: 63 additions & 4 deletions src/iterator.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ typedef enum {
//
// Static state that influences how the iterator operates.
//
static bool edge_mode = false;
static char *my_area = 0;
static char *my_router = 0;
static bool edge_mode = false;
static char *my_network = 0;
static char *my_area = 0;
static char *my_router = 0;

//
// Used for edge routers only. This is a list of routers that are connected directly
Expand Down Expand Up @@ -193,6 +194,50 @@ static void parse_address_view(qd_iterator_t *iter)
return;
}

if (qd_iterator_prefix(iter, "xnet/")) {
assert(my_area && my_router); // ensure qd_iterator_set_address called!
if (qd_iterator_prefix(iter, my_network)) {
if (qd_iterator_prefix(iter, "all/") || qd_iterator_prefix(iter, my_area)) {
if (qd_iterator_prefix(iter, "all/")) {
iter->prefix = QD_ITER_HASH_PREFIX_TOPOLOGICAL;
iter->state = STATE_AT_PREFIX;
return;
} else if (qd_iterator_prefix(iter, my_router)) {
iter->prefix = QD_ITER_HASH_PREFIX_LOCAL;
iter->state = STATE_AT_PREFIX;
return;
}

if (edge_mode)
set_to_edge_connection(iter);
else {
iter->prefix = QD_ITER_HASH_PREFIX_ROUTER;
iter->state = STATE_AT_PREFIX;
iter->mode = MODE_TO_SLASH;
}
return;
}

if (edge_mode)
set_to_edge_connection(iter);
else {
iter->prefix = QD_ITER_HASH_PREFIX_AREA;
iter->state = STATE_AT_PREFIX;
iter->mode = MODE_TO_SLASH;
}
return;
}

if (edge_mode) {
set_to_edge_connection(iter);
} else {
iter->prefix = QD_ITER_HASH_PREFIX_NETWORK;
iter->state = STATE_AT_PREFIX;
iter->mode = MODE_TO_SLASH;
}
return;
}

if (qd_iterator_prefix(iter, "edge/")) {
if (qd_iterator_prefix(iter, my_router)) {
iter->prefix = QD_ITER_HASH_PREFIX_LOCAL;
Expand Down Expand Up @@ -503,6 +548,18 @@ void qd_iterator_set_address(bool _edge_mode, const char *area, const char *rout
sprintf(my_router, "%s/", router);
}

void qd_iterator_set_network(const char *network)
{
free(my_network);
if (!network) {
my_network = 0;
} else {
const size_t network_size = strlen(network);
my_network = qd_malloc(network_size + 2);
sprintf(my_network, "%s/", network);
}
}

void qd_iterator_add_peer_edge(const char *router)
{
qd_iterator_peer_edge_t *peer_edge = NEW(qd_iterator_peer_edge_t);
Expand Down Expand Up @@ -801,7 +858,7 @@ bool qd_iterator_equal_n(qd_iterator_t *iter, const unsigned char *string, size_

bool qd_iterator_prefix(qd_iterator_t *iter, const char *prefix)
{
if (!iter)
if (!iter || !prefix)
return false;

qd_buffer_field_t save_pointer = iter->view_pointer;
Expand Down Expand Up @@ -1006,10 +1063,12 @@ qd_buffer_field_t qd_iterator_get_view_cursor(const qd_iterator_t *iter)

void qd_iterator_finalize(void)
{
free(my_network);
free(my_area);
free(my_router);

// unit tests need these zeroed
my_network = 0;
my_area = 0;
my_router = 0;
}
Expand Down
5 changes: 3 additions & 2 deletions src/router.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
router->router_mode = mode;
router->router_area = area;
router->router_id = id;
router->van_id = qd->van_id;
router->tenant_id = qd->tenant_id;

sys_mutex_init(&router->lock);
router->timer = qd_timer(qd, qd_router_timer_handler, (void*) router);
Expand All @@ -89,6 +89,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
// uses this to offload some of the address-processing load from the router.
//
qd_iterator_set_address(mode == QD_ROUTER_MODE_EDGE, area, id);
qd_iterator_set_network(qd->network_id);

switch (router->router_mode) {
case QD_ROUTER_MODE_STANDALONE:
Expand Down Expand Up @@ -121,7 +122,7 @@ void qd_router_free(qd_router_t *router)
//
router->router_id = 0;
router->router_area = 0;
router->van_id = 0;
router->tenant_id = 0;

qd_router_python_free(router);
qdr_core_free(router->router_core);
Expand Down
Loading