diff --git a/docs/notes/cross-network.md b/docs/notes/cross-network.md new file mode 100644 index 000000000..ff26bce16 --- /dev/null +++ b/docs/notes/cross-network.md @@ -0,0 +1,177 @@ + + + + + + + + + + + + + + + + + +# Cross (Inter) Network Communication + +Cross-network communication is message transfer between endpoints connected to disjoint Skupper/AMQP networks. The connectivity between the networks is achieved using connections with the 'inter-network' role and auto-links established over those connections. + +The easy part is creating connectivity for a particular mobile address, the address of a service. More challenging is the need to provide reply-to connectivity using a router-assigned dynamic address on the client-side (for request/response traffic patterns). + +## Use Cases + +There are a couple of use cases that drive the requirements for cross-network communication. + +### Central Management of Networks + +In the case where an enterprise maintains a large number of virtual application networks, a good way to provide connectivity for a management plane is to create a management network that maintains cross-network connectivity to each of the managed networks. + +In such a network, there is a inter-network connection between at least one router in the managed network and at lease one router in the management network. Multiple connections may be desired for redundancy and availability. All message connectivity is strictly between the management network and the individual managed network. No communication will be possible between two managed networks via the management network, thus preserving the isolation provided by an application network. + +### Inter-Network Federation + +In the case where an application running on a virtual application network wishes to expose a service for use from within another VAN without creating a general-use ingress, cross-network connectivity can be used to create a secure tunnel between VANs for specifically configured services. + +## Setting up Cross-Network Connectivity + +To illustrate the configuration of inter-network communication, we'll use the inter-network federation use case with two networks, net-A and net-B. + +Both networks are assumed to be composed of multiple interior routers and possibly some edge routers. The inter-network connection must be between two interior routers, one in each network. + +### Router Configuration + +All routers (interior and edge) must be configured with the name of the network using the `network` configuration: + +``` +network { + networkId: net-A +} +``` +### Listener and Connector Configuration + +An interior router in each network must be designated to be endpoints of the inter-network connection. For example, routers A3 (in net-A) and B2 (in net-B) are designated. A3 is configured with a `listener` and B2 is configured with a `connector`, both using role `inter-network`. B2 is configured to connect to A3 using standard host/port addressing and security. Both the listener and connector must be named because the configurations will include `autoLink` entities that refer to those connections. + +### Exposing Services Cross-Network + +Since the `inter-network` connection does not provide the network-joining functions of an `inter-router` connection, the two networks are not joined and do not share any topology or routing information. Nothing will flow over the `inter-network` connection until an `autoLink` is created which establishes a unidirectional link between the networks. + +The full functionality of auto-links is available for `inter-network` connections. They behave similarly to `route-container` connections. However, it is recommended for inter-network use cases that all auto-links be configured as `direction: in` and "pull" from the network hosting a particular destination. + +For example, if net-A hosts a service using the address `service-45` anywhere in its network, an auto-link should be configured in the designated router (A3) as follows: + +``` +autoLink { + connection: + address: service-45 + direction: in +} +``` + +This will cause the `service-45` address to be reachable from all routers in net-B. + +Each service that is to be reachable cross-network must have an auto-link created for it in the network in which the service is hosted. + +Important Note: Destinations cannot be load-balanced across different networks. In other words, there must not be cross-network services in different networks with the same address. If auto-links are created in both directions for an address, a loop is created that will almost certainly forward deliveries to that address to the wrong places. + +### Using Dynamic Addresses + +Almost always, service traffic is not a one-way affair. A user of a service expects to receive a response from the service when a request is sent. Such a client will first create a receiving link using a dynamic terminus. The dynamically-allocated address is then placed in the request's `reply-to` header for the server to use as the destination address of the reply or replies. + +If a network in a federation relationship is going to host clients using dynamic addresses, it must create an auto-link to carry those replies back from the serving network. The auto-link looks like this: + +``` +autoLink { + connection: + direction: in + externalAddress: _xnet/ +} +``` + +There are a couple of notable characteristics of this auto-link. It does not have an `address` attribute, making it "anonymous". This is important because it must issue credit to the peer network regardless of the existance of a particular destination locally. + +The other important aspect of this auto-link is that its remote address is interpreted at the other end as a "remote-network" address which will match any dynamic address created on the local network. This means that only one auto-link needs to be created regardless of how many explicit service addresses are exposed from the peer network. + +## A Complete Example + +The following example shows how to create a simple two-network federation on a single host where each network has one router. + +Router A: +``` +router { + id: A + mode: interior +} + +network { + networkId: net-A +} + +listener { + port: 10001 + role: normal +} + +listener { + name: federation + port: 11001 + role: inter-network +} + +autoLink { + connection: federation + direction: in + externalAddress: _xnet/net-A +} + +autoLink { + connection: federation + direction: in + address: service-A-to-B +} +``` + +Router B: +``` +router { + id: B + mode: interior +} + +network { + networkId: net-B +} + +listener { + port: 10002 + role: normal +} + +connector { + name: federation + port: 11001 + role: inter-network +} + +autoLink { + connection: federation + direction: in + externalAddress: _xnet/net-B +} + +autoLink { + connection: federation + direction: in + address: service-B-to-A +} +``` + +## Using Cross-Network Connectivity as an Endpoint + +## Backward Compatibility and Breaking Changes + +The `qd.cross-network` link capability is needed to provide backward compatibility with the new changes. At the next major version (v4), it is recommended that the `_xnet` address format be deprecated and its functionality be rolled into an extension of the `_topo` format for use by all endpoints. At this point, it would also be recommended to leave `_xnet` in place as an alias for `_topo` to protect applications written to use it. + +Changing the `_topo` format between major versions will break networks that are partially upgraded (i.e. routers with different formats of the topological address). diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index c722b4ad6..8510c5bb6 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -140,11 +140,12 @@ extern const char * const QD_CAPABILITY_ROUTER_DATA; extern const char * const QD_CAPABILITY_EDGE_DOWNLINK; extern const char * const QD_CAPABILITY_STREAMING_DELIVERIES; extern const char * const QD_CAPABILITY_RESEND_RELEASED; +extern const char * const QD_CAPABILITY_CROSS_NETWORK; /// @} /** @name Dynamic Node Properties */ /// @{ -extern const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS; ///< Address for routing dynamic sources +extern const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS; ///< Address for routing dynamic sources /// @} /** @name Connection Properties */ diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h index bea833ac4..aa61b7b9e 100644 --- a/include/qpid/dispatch/iterator.h +++ b/include/qpid/dispatch/iterator.h @@ -49,6 +49,7 @@ typedef struct qd_iterator_t qd_iterator_t; */ #define QD_ITER_HASH_PREFIX_TOPOLOGICAL 'T' #define QD_ITER_HASH_PREFIX_LOCAL 'L' +#define QD_ITER_HASH_PREFIX_NETWORK 'N' #define QD_ITER_HASH_PREFIX_AREA 'A' #define QD_ITER_HASH_PREFIX_ROUTER 'R' #define QD_ITER_HASH_PREFIX_MOBILE 'M' @@ -137,6 +138,11 @@ void qd_iterator_finalize(void); */ void qd_iterator_set_address(bool edge_mode, const char *area, const char *router); +/** + * Set the network ID for the local router. This can be updated repeatedly during run-time. + */ +void qd_iterator_set_network(const char *network); + /** * Add and delete peer-edge router identities. When in edge mode, peer edge routers * result in different hash results than remote edge routers. These functions are used diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 5c2c3995a..e48908238 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -347,6 +347,7 @@ typedef enum { QDR_ROLE_EDGE_CONNECTION, QDR_ROLE_INTER_ROUTER_DATA, QDR_ROLE_INTER_EDGE, + QDR_ROLE_INTER_NETWORK, } qdr_connection_role_t; typedef void (*qdr_connection_bind_context_t) (qdr_connection_t *context, void *token); @@ -424,6 +425,7 @@ void qdr_connection_set_tracing(qdr_connection_t *conn, bool enable_tracing); void qdr_core_close_connection(qdr_connection_t *conn); bool qdr_connection_route_container(qdr_connection_t *conn); +bool qdr_connection_inter_network(qdr_connection_t *conn); /** * qdr_connection_set_context diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 57e6fcae9..b3f49821a 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -49,7 +49,12 @@ ENUM_DECLARE(qd_router_mode); /** * Allocate and start an instance of the router core module. */ -qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id, const char *van_id); +qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id, const char *tenant_id); + +/** + * Set the network id + */ +void qdr_core_set_network_id(qdr_core_t *core, const char *network_id); /** * Stop and deallocate an instance of the router core. @@ -75,12 +80,12 @@ qd_dispatch_t *qdr_core_dispatch(qdr_core_t *core); void qdr_process_tick(qdr_core_t *core); /** - * @brief Return the text of the router's virtual application network ID, or 0. + * @brief Return the text of the router's virtual application network tenant ID, or 0. * * @param core Pointer to the core object returned by qdr_core() - * @return const char* null-terminated text of the van-id or 0 if there's no id. + * @return const char* null-terminated text of the tenant-id or 0 if there's no id. */ -const char *qdr_core_van_id(const qdr_core_t *core); +const char *qdr_core_tenant_id(const qdr_core_t *core); /** diff --git a/python/skupper_router/management/skrouter.json b/python/skupper_router/management/skrouter.json index a04a36989..6d1521623 100644 --- a/python/skupper_router/management/skrouter.json +++ b/python/skupper_router/management/skrouter.json @@ -462,6 +462,16 @@ "description": "Number of deliveries that were sent to route container connections.", "graph": true }, + "deliveriesIngressInterNetwork": { + "type": "integer", + "description": "Number of deliveries that were received from a connected network.", + "graph": true + }, + "deliveriesEgressInterNetwork": { + "type": "integer", + "description": "Number of deliveries that were sent to a connected network.", + "graph": true + }, "residentMemoryUsage": { "type": "integer", "graph": true, @@ -511,7 +521,7 @@ "create": true }, "vanId": { - "description":"Deprecated: See the managedRouter entity definition.", + "description":"Deprecated: See the network tenantId attribute.", "type": "string", "required": false, "create": true @@ -633,13 +643,21 @@ } }, - "managedRouter": { - "description": "This optional entity holds configuration needed for the router to be managed by a central manager", + "network": { + "description": "This optional entity holds configuration needed for the router to participate in inter-network communication", "extends": "configurationEntity", "singleton": true, + "operations": ["UPDATE"], "attributes": { - "vanId": { - "description":"The unique ID of the Virtual Application Network that this router is a member of.", + "networkId": { + "description":"The unique ID of the AMQP Network that this router is a member of.", + "type": "string", + "required": false, + "create": true, + "update": true + }, + "tenantId": { + "description":"The unique VAN tenant ID for routers that are connected to a multi-tenant service backbone.", "type": "string", "required": false, "create": true @@ -648,7 +666,7 @@ }, "site": { - "description":"Optional. If present, this entity causes the router to emit a SITE record for VanFLow in the case that there is not an external controller to emit this record.", + "description":"Optional. If present, this entity causes the router to emit a SITE record for VanFlow in the case that there is not an external controller to emit this record.", "extends": "configurationEntity", "attributes": { "location": { @@ -784,7 +802,8 @@ "inter-router", "route-container", "edge", - "inter-edge" + "inter-edge", + "inter-network" ], "default": "normal", "description": "The role of an established connection. In the normal role, the connection is assumed to be used for AMQP clients that are doing normal message delivery over the connection. In the inter-router role, the connection is assumed to be to another router in the network. Inter-router discovery and routing protocols can only be used over inter-router connections. route-container role can be used for router-container connections, for example, a router-broker connection. In the edge role, the connection is assumed to be between an edge router and an interior router.", @@ -958,7 +977,8 @@ "inter-router", "route-container", "edge", - "inter-edge" + "inter-edge", + "inter-network" ], "default": "normal", "description": "The role of an established connection. In the normal role, the connection is assumed to be used for AMQP clients that are doing normal message delivery over the connection. In the inter-router role, the connection is assumed to be to another router in the network. Inter-router discovery and routing protocols can only be used over inter-router connections. route-container role can be used for router-container connections, for example, a router-broker connection. In the edge role, the connection is assumed to be between and edge router and an interior router.", @@ -1598,6 +1618,16 @@ "description": "Number of deliveries that were sent to a route-container address.", "graph": true }, + "deliveriesIngressInterNetwork": { + "type": "integer", + "description": "Number of deliveries that were received from a connected network.", + "graph": true + }, + "deliveriesEgressInterNetwork": { + "type": "integer", + "description": "Number of deliveries that were sent to a connected network.", + "graph": true + }, "key": { "description": "Internal unique (to this router) key to identify the address", "type": "string" diff --git a/python/skupper_router_internal/dispatch.py b/python/skupper_router_internal/dispatch.py index 6d6ff6dda..6b4bf154d 100644 --- a/python/skupper_router_internal/dispatch.py +++ b/python/skupper_router_internal/dispatch.py @@ -103,7 +103,8 @@ def __init__(self) -> None: self._prototype(self.qd_error_code, c_long, [], check=False) self._prototype(self.qd_error_message, c_char_p, [], check=False) self._prototype(self.qd_log_entity, c_long, [py_object]) - self._prototype(self.qd_dispatch_configure_managed_router, None, [self.qd_dispatch_p, py_object]) + self._prototype(self.qd_dispatch_configure_network, None, [self.qd_dispatch_p, py_object]) + self._prototype(self.qd_dispatch_update_network, None, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_configure_router, None, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_configure_site, None, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_prepare, None, [self.qd_dispatch_p]) diff --git a/python/skupper_router_internal/management/agent.py b/python/skupper_router_internal/management/agent.py index 2ebe29025..f4ff7b3fe 100644 --- a/python/skupper_router_internal/management/agent.py +++ b/python/skupper_router_internal/management/agent.py @@ -280,15 +280,18 @@ def __str__(self): return super(RouterEntity, self).__str__().replace("Entity(", "RouterEntity(") -class ManagedRouterEntity(EntityAdapter): +class NetworkEntity(EntityAdapter): def __init__(self, agent, entity_type, attributes=None): - super(ManagedRouterEntity, self).__init__(agent, entity_type, attributes, validate=False) + super(NetworkEntity, self).__init__(agent, entity_type, attributes, validate=False) def create(self): - self._qd.qd_dispatch_configure_managed_router(self._dispatch, self) + self._qd.qd_dispatch_configure_network(self._dispatch, self) + + def _update(self): + self._qd.qd_dispatch_update_network(self._dispatch, self) def __str__(self): - return super(ManagedRouterEntity, self).__str__().replace("Entity(", "ManagedRouterEntity(") + return super(NetworkEntity, self).__str__().replace("Entity(", "NetworkEntity(") class SiteEntity(EntityAdapter): diff --git a/python/skupper_router_internal/management/config.py b/python/skupper_router_internal/management/config.py index 2c6f22c9a..ecc34f64b 100644 --- a/python/skupper_router_internal/management/config.py +++ b/python/skupper_router_internal/management/config.py @@ -304,7 +304,7 @@ def configure(attributes): # Configure and prepare the router before we can activate the agent. configure(config.by_type('router')[0]) - configure(config.by_type('managedRouter')[0]) + configure(config.by_type('network')[0]) qd.qd_dispatch_prepare(dispatch) qd.qd_router_setup_late(dispatch) # Actions requiring active management agent. agent.activate("$_management_internal") diff --git a/src/adaptors/adaptor_common.c b/src/adaptors/adaptor_common.c index e3e7f5bf3..432c5a909 100644 --- a/src/adaptors/adaptor_common.c +++ b/src/adaptors/adaptor_common.c @@ -97,10 +97,10 @@ qd_error_t qd_load_adaptor_config(qdr_core_t *core, qd_adaptor_config_t *config, // // If this router is annotated with a van-id, add the van-id to the address // - const char *van_id = qdr_core_van_id(core); - if (!!van_id) { - char *address = (char*) malloc(strlen(config_address) + strlen(van_id) + 2); - strcpy(address, van_id); + const char *tenant_id = qdr_core_tenant_id(core); + if (!!tenant_id) { + char *address = (char*) malloc(strlen(config_address) + strlen(tenant_id) + 2); + strcpy(address, tenant_id); strcat(address, "/"); strcat(address, config_address); config->address = address; diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index 028934552..f6f1b691e 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -50,11 +50,12 @@ amqp_adaptor_t amqp_adaptor; -static char *router_role = "inter-router"; -static char *router_data_role = "inter-router-data"; -static char *container_role = "route-container"; -static char *edge_role = "edge"; -static char *inter_edge_role = "inter-edge"; +static char *router_role = "inter-router"; +static char *router_data_role = "inter-router-data"; +static char *container_role = "route-container"; +static char *edge_role = "edge"; +static char *inter_edge_role = "inter-edge"; +static char *inter_network_role = "inter-network"; static void deferred_AMQP_rx_handler(qd_connection_t *qd_conn, void *context, bool discard); @@ -295,9 +296,11 @@ static void qd_router_connection_get_config(const qd_connection_t *conn, *strip_annotations_out = false; *role = QDR_ROLE_INTER_EDGE; *cost = cf->inter_router_cost; - } else if (cf && (strcmp(cf->role, container_role) == 0)) // backward compat + } else if (cf && (strcmp(cf->role, container_role) == 0)) { // backward compat *role = QDR_ROLE_ROUTE_CONTAINER; - else + } else if (cf && (strcmp(cf->role, inter_network_role) == 0)) { + *role = QDR_ROLE_INTER_NETWORK; + } else *role = QDR_ROLE_NORMAL; *name = cf ? cf->name : 0; diff --git a/src/amqp.c b/src/amqp.c index d531ad008..3c868c806 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -34,6 +34,7 @@ const char * const QD_CAPABILITY_ROUTER_DATA = "qd.router-data"; const char * const QD_CAPABILITY_EDGE_DOWNLINK = "qd.router-edge-downlink"; const char * const QD_CAPABILITY_STREAMING_DELIVERIES = "qd.streaming-deliveries"; const char * const QD_CAPABILITY_RESEND_RELEASED = "qd.resend-released"; +const char * const QD_CAPABILITY_CROSS_NETWORK = "qd.cross-network"; const char * const QD_CAPABILITY_ANONYMOUS_RELAY = "ANONYMOUS-RELAY"; const char * const QD_CAPABILITY_STREAMING_LINKS = "qd.streaming-links"; const char * const QD_CAPABILITY_INTER_EDGE = "qd.router-inter-edge"; diff --git a/src/dispatch.c b/src/dispatch.c index b647f7c7e..548ea9ded 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -58,7 +58,8 @@ void qd_router_free(qd_router_t *router); void qd_error_initialize(void); static void qd_dispatch_set_router_id(qd_dispatch_t *qd, char *_id); static void qd_dispatch_set_router_area(qd_dispatch_t *qd, char *_area); -static void qd_dispatch_set_router_van_id(qd_dispatch_t *qd, char *_van_id); +static void qd_dispatch_set_router_tenant_id(qd_dispatch_t *qd, char *_tenant_id); +static void qd_dispatch_set_router_network_id(qd_dispatch_t *qd, char *_network_id); static void qd_dispatch_policy_c_counts_free(PyObject *capsule); const char *CLOSEST_DISTRIBUTION = "closest"; @@ -219,12 +220,25 @@ static void qd_dispatch_set_router_default_distribution(qd_dispatch_t *qd, char free(distribution); } -qd_error_t qd_dispatch_configure_managed_router(qd_dispatch_t *qd, qd_entity_t *entity) +qd_error_t qd_dispatch_configure_network(qd_dispatch_t *qd, qd_entity_t *entity) { - char *van_id = qd_entity_opt_string(entity, "vanId", 0); QD_ERROR_RET(); - if (van_id) { - qd_dispatch_set_router_van_id(qd, van_id); + char *tenant_id = qd_entity_opt_string(entity, "tenantId", 0); QD_ERROR_RET(); + if (tenant_id) { + qd_dispatch_set_router_tenant_id(qd, tenant_id); } + + char *network_id = qd_entity_opt_string(entity, "networkId", 0); QD_ERROR_RET(); + qd_dispatch_set_router_network_id(qd, network_id); + + return QD_ERROR_NONE; +} + +qd_error_t qd_dispatch_update_network(qd_dispatch_t *qd, qd_entity_t *entity) +{ + char *network_id = qd_entity_opt_string(entity, "networkId", 0); QD_ERROR_RET(); + qd_dispatch_set_router_network_id(qd, network_id); + qdr_core_set_network_id(qd->router->router_core, network_id); + return QD_ERROR_NONE; } @@ -234,8 +248,8 @@ qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity) qd_dispatch_set_router_id(qd, qd_entity_opt_string(entity, "id", 0)); QD_ERROR_RET(); char *van_id = qd_entity_opt_string(entity, "vanId", 0); QD_ERROR_RET(); if (van_id) { - qd_dispatch_set_router_van_id(qd, van_id); - qd_log(LOG_ROUTER, QD_LOG_WARNING, "The vanId attribute is deprecated in the router entity. Please move it to managedRouter."); + qd_dispatch_set_router_tenant_id(qd, van_id); + qd_log(LOG_ROUTER, QD_LOG_WARNING, "The vanId attribute is deprecated in the router entity. Please use tenantId in managedRouter."); } qd->router_mode = qd_entity_get_long(entity, "mode"); QD_ERROR_RET(); if (!qd->router_id) { @@ -397,12 +411,20 @@ static void qd_dispatch_set_router_area(qd_dispatch_t *qd, char *_area) { qd->router_area = _area; } -// Takes ownership of _van_id -static void qd_dispatch_set_router_van_id(qd_dispatch_t *qd, char *_van_id) { - if (qd->van_id) { - free(qd->van_id); +// Takes ownership of _tenant_id +static void qd_dispatch_set_router_tenant_id(qd_dispatch_t *qd, char *_tenant_id) { + if (qd->tenant_id) { + free(qd->tenant_id); + } + qd->tenant_id = _tenant_id; +} + +// Takes ownership of _network_id +static void qd_dispatch_set_router_network_id(qd_dispatch_t *qd, char *_network_id) { + if (qd->network_id) { + free(qd->network_id); } - qd->van_id = _van_id; + qd->network_id = _network_id; } void qd_dispatch_free(qd_dispatch_t *qd) @@ -425,7 +447,8 @@ void qd_dispatch_free(qd_dispatch_t *qd) qd_python_finalize(); qd_dispatch_set_router_id(qd, NULL); qd_dispatch_set_router_area(qd, NULL); - qd_dispatch_set_router_van_id(qd, NULL); + qd_dispatch_set_router_tenant_id(qd, NULL); + qd_dispatch_set_router_network_id(qd, NULL); qd_iterator_finalize(); free(qd->timestamp_format); free(qd->metadata); @@ -454,7 +477,8 @@ qd_connection_manager_t *qd_dispatch_connection_manager(const qd_dispatch_t *qd) QD_EXPORT void qd_router_setup_late(qd_dispatch_t *qd) { qd->router->tracemask = qd_tracemask(); - qd->router->router_core = qdr_core(qd, qd->router->router_mode, qd->router->router_area, qd->router->router_id, qd->router->van_id); + qd->router->router_core = qdr_core(qd, qd->router->router_mode, qd->router->router_area, qd->router->router_id, qd->router->tenant_id); + qdr_core_set_network_id(qd->router->router_core, qd->network_id); qd_router_python_setup(qd->router); qd_timer_schedule(qd->router->timer, 1000); } diff --git a/src/dispatch_private.h b/src/dispatch_private.h index 9afc2fece..73a2ae68e 100644 --- a/src/dispatch_private.h +++ b/src/dispatch_private.h @@ -53,7 +53,8 @@ struct qd_dispatch_t { char *sasl_config_name; char *router_area; char *router_id; - char *van_id; + char *network_id; + char *tenant_id; char *timestamp_format; char *metadata; bool timestamps_in_utc; diff --git a/src/iterator.c b/src/iterator.c index ba26af585..8c5d0d161 100644 --- a/src/iterator.c +++ b/src/iterator.c @@ -86,9 +86,10 @@ typedef enum { // // Static state that influences how the iterator operates. // -static bool edge_mode = false; -static char *my_area = 0; -static char *my_router = 0; +static bool edge_mode = false; +static char *my_network = 0; +static char *my_area = 0; +static char *my_router = 0; // // Used for edge routers only. This is a list of routers that are connected directly @@ -193,6 +194,50 @@ static void parse_address_view(qd_iterator_t *iter) return; } + if (qd_iterator_prefix(iter, "xnet/")) { + assert(my_area && my_router); // ensure qd_iterator_set_address called! + if (qd_iterator_prefix(iter, my_network)) { + if (qd_iterator_prefix(iter, "all/") || qd_iterator_prefix(iter, my_area)) { + if (qd_iterator_prefix(iter, "all/")) { + iter->prefix = QD_ITER_HASH_PREFIX_TOPOLOGICAL; + iter->state = STATE_AT_PREFIX; + return; + } else if (qd_iterator_prefix(iter, my_router)) { + iter->prefix = QD_ITER_HASH_PREFIX_LOCAL; + iter->state = STATE_AT_PREFIX; + return; + } + + if (edge_mode) + set_to_edge_connection(iter); + else { + iter->prefix = QD_ITER_HASH_PREFIX_ROUTER; + iter->state = STATE_AT_PREFIX; + iter->mode = MODE_TO_SLASH; + } + return; + } + + if (edge_mode) + set_to_edge_connection(iter); + else { + iter->prefix = QD_ITER_HASH_PREFIX_AREA; + iter->state = STATE_AT_PREFIX; + iter->mode = MODE_TO_SLASH; + } + return; + } + + if (edge_mode) { + set_to_edge_connection(iter); + } else { + iter->prefix = QD_ITER_HASH_PREFIX_NETWORK; + iter->state = STATE_AT_PREFIX; + iter->mode = MODE_TO_SLASH; + } + return; + } + if (qd_iterator_prefix(iter, "edge/")) { if (qd_iterator_prefix(iter, my_router)) { iter->prefix = QD_ITER_HASH_PREFIX_LOCAL; @@ -230,6 +275,54 @@ static void parse_address_view(qd_iterator_t *iter) } } + if (qd_iterator_prefix(iter, "xedge/")) { + if (qd_iterator_prefix(iter, my_network)) { + if (qd_iterator_prefix(iter, my_router)) { + iter->prefix = QD_ITER_HASH_PREFIX_LOCAL; + iter->state = STATE_AT_PREFIX; + return; + } + + if (edge_mode) { + bool is_peer = false; + qd_iterator_peer_edge_t *peer_edge = DEQ_HEAD(peer_edges); + qd_buffer_field_t save_pointer = iter->view_pointer; + while (!!peer_edge) { + if (qd_iterator_prefix(iter, peer_edge->router_id)) { + is_peer = true; + iter->view_pointer = save_pointer; + break; + } + peer_edge = DEQ_NEXT(peer_edge); + } + + if (is_peer) { + iter->prefix = QD_ITER_HASH_PREFIX_EDGE_SUMMARY; + iter->state = STATE_AT_PREFIX; + iter->mode = MODE_TO_SLASH; + } else { + set_to_edge_connection(iter); + } + + return; + } else { + iter->prefix = QD_ITER_HASH_PREFIX_EDGE_SUMMARY; + iter->state = STATE_AT_PREFIX; + iter->mode = MODE_TO_SLASH; + return; + } + } + + if (edge_mode) { + set_to_edge_connection(iter); + } else { + iter->prefix = QD_ITER_HASH_PREFIX_NETWORK; + iter->state = STATE_AT_PREFIX; + iter->mode = MODE_TO_SLASH; + } + return; + } + iter->view_pointer = save_pointer; } @@ -503,6 +596,18 @@ void qd_iterator_set_address(bool _edge_mode, const char *area, const char *rout sprintf(my_router, "%s/", router); } +void qd_iterator_set_network(const char *network) +{ + free(my_network); + if (!network) { + my_network = 0; + } else { + const size_t network_size = strlen(network); + my_network = qd_malloc(network_size + 2); + sprintf(my_network, "%s/", network); + } +} + void qd_iterator_add_peer_edge(const char *router) { qd_iterator_peer_edge_t *peer_edge = NEW(qd_iterator_peer_edge_t); @@ -801,7 +906,7 @@ bool qd_iterator_equal_n(qd_iterator_t *iter, const unsigned char *string, size_ bool qd_iterator_prefix(qd_iterator_t *iter, const char *prefix) { - if (!iter) + if (!iter || !prefix) return false; qd_buffer_field_t save_pointer = iter->view_pointer; @@ -1006,10 +1111,12 @@ qd_buffer_field_t qd_iterator_get_view_cursor(const qd_iterator_t *iter) void qd_iterator_finalize(void) { + free(my_network); free(my_area); free(my_router); // unit tests need these zeroed + my_network = 0; my_area = 0; my_router = 0; } diff --git a/src/policy.c b/src/policy.c index 75214b720..ea34df9fa 100644 --- a/src/policy.c +++ b/src/policy.c @@ -1249,7 +1249,7 @@ void qd_policy_amqp_open_connector(qd_connection_t *qd_conn) { bool connection_allowed = true; if (policy->enableVhostPolicy && - (!qd_conn->role || !strcmp(qd_conn->role, "normal") || !strcmp(qd_conn->role, "route-container"))) { + (!qd_conn->role || !strcmp(qd_conn->role, "normal") || !strcmp(qd_conn->role, "route-container") || !strcmp(qd_conn->role, "inter-network"))) { // Open connection or not based on policy. uint32_t conn_id = qd_conn->connection_id; diff --git a/src/router.c b/src/router.c index f2ec181c8..9d348a565 100644 --- a/src/router.c +++ b/src/router.c @@ -79,7 +79,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are router->router_mode = mode; router->router_area = area; router->router_id = id; - router->van_id = qd->van_id; + router->tenant_id = qd->tenant_id; sys_mutex_init(&router->lock); router->timer = qd_timer(qd, qd_router_timer_handler, (void*) router); @@ -89,6 +89,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are // uses this to offload some of the address-processing load from the router. // qd_iterator_set_address(mode == QD_ROUTER_MODE_EDGE, area, id); + qd_iterator_set_network(qd->network_id); switch (router->router_mode) { case QD_ROUTER_MODE_STANDALONE: @@ -121,7 +122,7 @@ void qd_router_free(qd_router_t *router) // router->router_id = 0; router->router_area = 0; - router->van_id = 0; + router->tenant_id = 0; qd_router_python_free(router); qdr_core_free(router->router_core); diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c index 7382b3407..a62c3d82f 100644 --- a/src/router_core/agent_address.c +++ b/src/router_core/agent_address.c @@ -37,11 +37,13 @@ #define QDR_ADDRESS_DELIVERIES_FROM_CONTAINER 13 #define QDR_ADDRESS_DELIVERIES_EGRESS_ROUTE_CONTAINER 14 #define QDR_ADDRESS_DELIVERIES_INGRESS_ROUTE_CONTAINER 15 -#define QDR_ADDRESS_TRANSIT_OUTSTANDING 16 -#define QDR_ADDRESS_TRACKED_DELIVERIES 17 -#define QDR_ADDRESS_PRIORITY 18 -#define QDR_ADDRESS_DELIVERIES_REDIRECTED 19 -#define QDR_ADDRESS_WATCH 20 +#define QDR_ADDRESS_DELIVERIES_EGRESS_INTER_NETWORK 16 +#define QDR_ADDRESS_DELIVERIES_INGRESS_INTER_NETWORK 17 +#define QDR_ADDRESS_TRANSIT_OUTSTANDING 18 +#define QDR_ADDRESS_TRACKED_DELIVERIES 19 +#define QDR_ADDRESS_PRIORITY 20 +#define QDR_ADDRESS_DELIVERIES_REDIRECTED 21 +#define QDR_ADDRESS_WATCH 22 const char *qdr_address_columns[] = {"name", @@ -60,6 +62,8 @@ const char *qdr_address_columns[] = "deliveriesFromContainer", "deliveriesEgressRouteContainer", "deliveriesIngressRouteContainer", + "deliveriesEgressInterNetwork", + "deliveriesIngressInterNetwork", "transitOutstanding", "trackedDeliveries", "priority", @@ -153,6 +157,14 @@ static void qdr_insert_address_columns_CT(qdr_core_t *core, qd_compose_insert_ulong(body, addr->deliveries_ingress_route_container); break; + case QDR_ADDRESS_DELIVERIES_EGRESS_INTER_NETWORK: + qd_compose_insert_ulong(body, addr->deliveries_egress_inter_network); + break; + + case QDR_ADDRESS_DELIVERIES_INGRESS_INTER_NETWORK: + qd_compose_insert_ulong(body, addr->deliveries_ingress_inter_network); + break; + case QDR_ADDRESS_TRANSIT_OUTSTANDING: if (addr->outstanding_deliveries) { qd_compose_start_list(body); diff --git a/src/router_core/agent_address.h b/src/router_core/agent_address.h index e25ba6a46..88f8e79ca 100644 --- a/src/router_core/agent_address.h +++ b/src/router_core/agent_address.h @@ -31,7 +31,7 @@ void qdra_address_get_CT(qdr_core_t *core, const char *qdr_address_columns[]); -#define QDR_ADDRESS_COLUMN_COUNT 21 +#define QDR_ADDRESS_COLUMN_COUNT 23 extern const char *qdr_address_columns[QDR_ADDRESS_COLUMN_COUNT + 1]; diff --git a/src/router_core/agent_connection.c b/src/router_core/agent_connection.c index 53391d87a..293736d61 100644 --- a/src/router_core/agent_connection.c +++ b/src/router_core/agent_connection.c @@ -71,6 +71,7 @@ const char *qdr_connection_roles[] = "edge", "inter-router-data", "inter-edge", + "inter-network", 0}; const char *qdr_connection_columns[] = diff --git a/src/router_core/agent_router_metrics.c b/src/router_core/agent_router_metrics.c index ba75a06d0..2922517c5 100644 --- a/src/router_core/agent_router_metrics.c +++ b/src/router_core/agent_router_metrics.c @@ -48,13 +48,15 @@ #define QDR_ROUTER_DELIVERIES_TRANSIT 19 #define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER 20 #define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER 21 -#define QDR_ROUTER_DELIVERIES_REDIRECTED 22 -#define QDR_ROUTER_LINKS_BLOCKED 23 -#define QDR_ROUTER_UPTIME_SECONDS 24 -#define QDR_ROUTER_MEMORY_USAGE 25 -#define QDR_ROUTER_RSS_USAGE 26 -#define QDR_ROUTER_CONNECTION_COUNTERS 27 -#define QDR_ROUTER_VERSION 28 +#define QDR_ROUTER_DELIVERIES_INGRESS_INTER_NETWORK 22 +#define QDR_ROUTER_DELIVERIES_EGRESS_INTER_NETWORK 23 +#define QDR_ROUTER_DELIVERIES_REDIRECTED 24 +#define QDR_ROUTER_LINKS_BLOCKED 25 +#define QDR_ROUTER_UPTIME_SECONDS 26 +#define QDR_ROUTER_MEMORY_USAGE 27 +#define QDR_ROUTER_RSS_USAGE 28 +#define QDR_ROUTER_CONNECTION_COUNTERS 29 +#define QDR_ROUTER_VERSION 31 const char *qdr_router_columns[] = {"identity", @@ -79,6 +81,8 @@ const char *qdr_router_columns[] = "deliveriesTransit", "deliveriesIngressRouteContainer", "deliveriesEgressRouteContainer", + "deliveriesIngressInterNetwork", + "deliveriesEgressInterNetwork", "deliveriesRedirectedToFallback", "linksBlocked", "uptimeSeconds", @@ -186,6 +190,14 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_co qd_compose_insert_ulong(body, core->deliveries_egress_route_container); break; + case QDR_ROUTER_DELIVERIES_INGRESS_INTER_NETWORK: + qd_compose_insert_ulong(body, core->deliveries_ingress_inter_network); + break; + + case QDR_ROUTER_DELIVERIES_EGRESS_INTER_NETWORK: + qd_compose_insert_ulong(body, core->deliveries_egress_inter_network); + break; + case QDR_ROUTER_DELIVERIES_REDIRECTED: qd_compose_insert_ulong(body, core->deliveries_redirected); break; diff --git a/src/router_core/agent_router_metrics.h b/src/router_core/agent_router_metrics.h index b660d4c26..53b089e3b 100644 --- a/src/router_core/agent_router_metrics.h +++ b/src/router_core/agent_router_metrics.h @@ -21,7 +21,7 @@ #include "router_core_private.h" -#define QDR_ROUTER_METRICS_COLUMN_COUNT 29 +#define QDR_ROUTER_METRICS_COLUMN_COUNT 32 extern const char *qdr_router_columns[QDR_ROUTER_METRICS_COLUMN_COUNT + 1]; diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 08e40a6d5..eb1344e08 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -173,6 +173,10 @@ bool qdr_connection_route_container(qdr_connection_t *conn) return conn->role == QDR_ROLE_ROUTE_CONTAINER; } +bool qdr_connection_inter_network(qdr_connection_t *conn) +{ + return conn->role == QDR_ROLE_INTER_NETWORK; +} void qdr_connection_set_context(qdr_connection_t *conn, void *context) { @@ -1741,7 +1745,7 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo qdr_connection_group_member_setup_CT(core, conn); } - if (conn->role == QDR_ROLE_ROUTE_CONTAINER) { + if (conn->role == QDR_ROLE_ROUTE_CONTAINER || conn->role == QDR_ROLE_INTER_NETWORK) { // // Notify the route-control module that a route-container connection has opened. // There may be routes that need to be activated due to the opening of this connection. @@ -1788,6 +1792,7 @@ qdr_link_t *qdr_connection_new_streaming_link_CT(qdr_core_t *core, qdr_connectio case QDR_ROLE_EDGE_CONNECTION: case QDR_ROLE_INTER_ROUTER_DATA: case QDR_ROLE_ROUTE_CONTAINER: + case QDR_ROLE_INTER_NETWORK: case QDR_ROLE_NORMAL: out_link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, QD_OUTGOING, qdr_terminus(0), qdr_terminus(0), diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 67d6803a4..b466ff0a4 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -782,6 +782,9 @@ int qdr_forward_closest_CT(qdr_core_t *core, if (qdr_connection_route_container(out_link->conn)) { core->deliveries_egress_route_container++; addr->deliveries_egress_route_container++; + } else if (qdr_connection_inter_network(out_link->conn)) { + core->deliveries_egress_inter_network++; + addr->deliveries_egress_inter_network++; } return 1; @@ -1107,6 +1110,9 @@ int qdr_forward_balanced_CT(qdr_core_t *core, if (qdr_connection_route_container(chosen_link->conn)) { core->deliveries_egress_route_container++; addr->deliveries_egress_route_container++; + } else if (qdr_connection_inter_network(chosen_link->conn)) { + core->deliveries_egress_inter_network++; + addr->deliveries_egress_inter_network++; } } return 1; diff --git a/src/router_core/modules/address_lookup_client/address_lookup_client.c b/src/router_core/modules/address_lookup_client/address_lookup_client.c index 5fc812a9c..196e863e4 100644 --- a/src/router_core/modules/address_lookup_client/address_lookup_client.c +++ b/src/router_core/modules/address_lookup_client/address_lookup_client.c @@ -25,6 +25,7 @@ #include "qpid/dispatch/ctools.h" #include "qpid/dispatch/discriminator.h" +#include "qpid/dispatch/protocol_adaptor.h" #include @@ -37,31 +38,41 @@ typedef struct qcm_lookup_client_t { * Generate a temporary routable address for a destination connected to this * router node. Caller must free() return value when done. */ -static char *qdr_generate_temp_addr(qdr_core_t *core) +static char *qdr_generate_temp_addr(qdr_core_t *core, bool use_xnet) { - static const char *edge_template = "amqp:/_edge/%s/temp.%s"; - static const char *edge_template_van = "amqp:/_edge/%s/%s/temp.%s"; - static const char *topo_template = "amqp:/_topo/%s/%s/temp.%s"; - static const char *topo_template_van = "amqp:/_topo/%s/%s/%s/temp.%s"; - const size_t max_template = 20; // printable chars + static const char *edge_template = "amqp:/_edge/%s/temp.%s"; + static const char *edge_template_van = "amqp:/_edge/%s/%s/temp.%s"; + static const char *topo_template = "amqp:/_topo/%s/%s/temp.%s"; + static const char *topo_template_van = "amqp:/_topo/%s/%s/%s/temp.%s"; + static const char *xnet_template = "amqp:/_xnet/%s/%s/%s/temp.%s"; + static const char *xnet_edge_template = "amqp:/_xedge/%s/%s/temp.%s"; + const size_t max_template = 20; // printable chars char discriminator[QD_DISCRIMINATOR_SIZE]; qd_generate_discriminator(discriminator); size_t len = max_template + QD_DISCRIMINATOR_SIZE + 1 - + strlen(core->router_id) + strlen(core->router_area) - + (!!core->van_id ? strlen(core->van_id) : 0); + + strlen(core->router_id) + strlen(core->router_area); + if (use_xnet && !!core->network_id) { + len += strlen(core->network_id); + } else { + len += (!!core->tenant_id ? strlen(core->tenant_id) : 0); + } int rc; char *buffer = qd_malloc(len); if (core->router_mode == QD_ROUTER_MODE_EDGE) { - if (!!core->van_id) { - rc = snprintf(buffer, len, edge_template_van, core->router_id, core->van_id, discriminator); + if (use_xnet && !!core->network_id) { + rc = snprintf(buffer, len, xnet_edge_template, core->network_id, core->router_id, discriminator); + } else if (!!core->tenant_id) { + rc = snprintf(buffer, len, edge_template_van, core->router_id, core->tenant_id, discriminator); } else { rc = snprintf(buffer, len, edge_template, core->router_id, discriminator); } } else { - if (!!core->van_id) { - rc = snprintf(buffer, len, topo_template_van, core->router_area, core->router_id, core->van_id, discriminator); + if (use_xnet && !!core->network_id) { + rc = snprintf(buffer, len, xnet_template, core->network_id, core->router_area, core->router_id, discriminator); + } else if (!!core->tenant_id) { + rc = snprintf(buffer, len, topo_template_van, core->router_area, core->router_id, core->tenant_id, discriminator); } else { rc = snprintf(buffer, len, topo_template, core->router_area, core->router_id, discriminator); } @@ -136,10 +147,12 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, // unlikely). // char *temp_addr = 0; - if (dir == QD_OUTGOING) - temp_addr = qdr_generate_temp_addr(core); - else + if (dir == QD_OUTGOING) { + bool use_xnet = qdr_terminus_has_capability(terminus, QD_CAPABILITY_CROSS_NETWORK); + temp_addr = qdr_generate_temp_addr(core, use_xnet); + } else { temp_addr = qdr_generate_mobile_addr(core); + } qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr); diff --git a/src/router_core/modules/mobile_sync/mobile.c b/src/router_core/modules/mobile_sync/mobile.c index 31bdd1e14..08913555f 100644 --- a/src/router_core/modules/mobile_sync/mobile.c +++ b/src/router_core/modules/mobile_sync/mobile.c @@ -117,7 +117,7 @@ static qd_address_treatment_t qcm_mobile_sync_default_treatment(qdr_core_t *core static bool qcm_mobile_sync_addr_is_mobile(qdr_address_t *addr) { const char *hash_key = (const char*) qd_hash_key_by_handle(addr->hash_handle); - return !!strchr("MH", hash_key[0]); + return !!strchr("MNH", hash_key[0]); } diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index e950b1099..d0172ba41 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -392,7 +392,7 @@ void qdr_route_connection_opened_CT(qdr_core_t *core, qdr_field_t *container_field, qdr_field_t *connection_field) { - if (conn->role != QDR_ROLE_ROUTE_CONTAINER) + if (conn->role != QDR_ROLE_ROUTE_CONTAINER && conn->role != QDR_ROLE_INTER_NETWORK) return; if (connection_field) { @@ -421,7 +421,7 @@ void qdr_route_connection_opened_CT(qdr_core_t *core, void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn) { - if (conn->role != QDR_ROLE_ROUTE_CONTAINER) + if (conn->role != QDR_ROLE_ROUTE_CONTAINER && conn->role != QDR_ROLE_INTER_NETWORK) return; qdr_conn_identifier_t *cid = conn->conn_id; diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 53cf69723..5b8f5e5b3 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -72,7 +72,7 @@ static void qdr_core_setup_init(qdr_core_t *core) qdr_adaptors_init(core); } -qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id, const char *van_id) +qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id, const char *tenant_id) { qdr_core_t *core = NEW(qdr_core_t); ZERO(core); @@ -81,7 +81,7 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, core->router_mode = mode; core->router_area = area; core->router_id = id; - core->van_id = van_id; + core->tenant_id = tenant_id; core->worker_thread_count = qd->thread_count; sys_atomic_init(&core->uptime_ticks, 0); @@ -90,8 +90,8 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, // module logs to the ROUTER_CORE module. There is no need to free the core->log as all log sources are. // freed by qd_dispatch_free() // - if (!!van_id) { - qd_log(LOG_ROUTER, QD_LOG_INFO, "Router is a member of Application Network: %s", van_id); + if (!!tenant_id) { + qd_log(LOG_ROUTER, QD_LOG_INFO, "Router is a member of Application Network Tenant: %s", tenant_id); } // @@ -137,6 +137,21 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, return core; } +void qdr_core_set_network_id_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + if (!discard) { + core->network_id = (char*) action->args.general.context_1; + qd_iterator_set_network(core->network_id); + qd_log(LOG_ROUTER_CORE, QD_LOG_INFO, "Router's network-id updated to %s", core->network_id); + } +} + +void qdr_core_set_network_id(qdr_core_t *core, const char *network_id) +{ + qdr_action_t *action = qdr_action(qdr_core_set_network_id_CT, "set_network_id"); + action->args.general.context_1 = (void*) network_id; + qdr_action_enqueue(core, action); +} void qdr_core_stop_thread_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { if (!discard) { @@ -390,9 +405,9 @@ void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode) } -const char *qdr_core_van_id(const qdr_core_t *core) +const char *qdr_core_tenant_id(const qdr_core_t *core) { - return core->van_id; + return core->tenant_id; } ALLOC_DECLARE(qdr_field_t); diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 6337cd490..48aad5e93 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -601,6 +601,8 @@ struct qdr_address_t { uint64_t deliveries_from_container; uint64_t deliveries_egress_route_container; uint64_t deliveries_ingress_route_container; + uint64_t deliveries_egress_inter_network; + uint64_t deliveries_ingress_inter_network; uint64_t deliveries_redirected; ///@} @@ -869,7 +871,8 @@ struct qdr_core_t { qd_router_mode_t router_mode; const char *router_area; const char *router_id; - const char *van_id; + const char *tenant_id; + char *network_id; int worker_thread_count; qdr_address_config_list_t addr_config; @@ -918,6 +921,8 @@ struct qdr_core_t { uint64_t deliveries_transit; uint64_t deliveries_egress_route_container; uint64_t deliveries_ingress_route_container; + uint64_t deliveries_egress_inter_network; + uint64_t deliveries_ingress_inter_network; uint64_t deliveries_delayed_1sec; uint64_t deliveries_delayed_10sec; uint64_t deliveries_stuck; diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 5e67b4c1e..7e916c834 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -556,8 +556,10 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery if (qdr_connection_route_container(link->conn)) { addr->deliveries_ingress_route_container++; core->deliveries_ingress_route_container++; + } else if (qdr_connection_inter_network(link->conn)) { + core->deliveries_ingress_inter_network++; + addr->deliveries_ingress_inter_network++; } - } } else { // diff --git a/src/router_private.h b/src/router_private.h index e2a0270e3..901e26ef9 100644 --- a/src/router_private.h +++ b/src/router_private.h @@ -57,7 +57,7 @@ struct qd_router_t { qd_router_mode_t router_mode; const char *router_area; const char *router_id; - const char *van_id; + const char *tenant_id; sys_mutex_t lock; qd_timer_t *timer; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 04c0b4225..503a89624 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -176,6 +176,7 @@ foreach(py_test_module system_tests_http1_decoder system_tests_cert_rotation system_tests_split_path + system_tests_cross_network ) string(CONFIGURE "${PYTHON_TEST_COMMAND}" CONFIGURED_PYTHON_TEST_COMMAND) diff --git a/tests/field_test.c b/tests/field_test.c index 092fa9bac..deaa1094c 100644 --- a/tests/field_test.c +++ b/tests/field_test.c @@ -313,27 +313,34 @@ static char *verify_iterator(void *context, qd_iterator_t *iter, static char* test_view_address_hash(void *context) { struct {const char *addr; const char *view;} cases[] = { - {"amqp:/_local/my-addr/sub", "Lmy-addr/sub"}, - {"amqp:/_local/my-addr", "Lmy-addr"}, - {"_local/my-addr", "Lmy-addr"}, - {"amqp:/_topo/area/router/local/sub", "Aarea"}, - {"amqp:/_topo/my-area/router/local/sub", "Rrouter"}, - {"amqp:/_topo/my-area/my-router/local/sub", "Llocal/sub"}, - {"amqp:/_topo/area/all/local/sub", "Aarea"}, - {"amqp:/_topo/my-area/all/local/sub", "Tlocal/sub"}, - {"amqp:/_topo/all/all/local/sub", "Tlocal/sub"}, - {"amqp://host:port/_local/my-addr", "Lmy-addr"}, - {"_topo/area/router/my-addr", "Aarea"}, - {"_topo/my-area/router/my-addr", "Rrouter"}, - {"_topo/my-area/my-router/my-addr", "Lmy-addr"}, - {"_topo/my-area/router", "Rrouter"}, - {"amqp:/mobile", "Mmobile"}, - {"mobile", "Mmobile"}, - {"/mobile", "Mmobile"}, - {"amqp:/_edge/router/sub", "Hrouter"}, - {"_edge/router/sub", "Hrouter"}, - - // Re-run the above tests to make sure trailing dots are ignored. + {"amqp:/_local/my-addr/sub", "Lmy-addr/sub"}, + {"amqp:/_local/my-addr", "Lmy-addr"}, + {"_local/my-addr", "Lmy-addr"}, + {"amqp:/_topo/area/router/local/sub", "Aarea"}, + {"amqp:/_topo/my-area/router/local/sub", "Rrouter"}, + {"amqp:/_topo/my-area/my-router/local/sub", "Llocal/sub"}, + {"amqp:/_topo/area/all/local/sub", "Aarea"}, + {"amqp:/_topo/my-area/all/local/sub", "Tlocal/sub"}, + {"amqp:/_topo/all/all/local/sub", "Tlocal/sub"}, + {"amqp://host:port/_local/my-addr", "Lmy-addr"}, + {"_topo/area/router/my-addr", "Aarea"}, + {"_topo/my-area/router/my-addr", "Rrouter"}, + {"_topo/my-area/my-router/my-addr", "Lmy-addr"}, + {"_topo/my-area/router", "Rrouter"}, + {"amqp:/mobile", "Mmobile"}, + {"mobile", "Mmobile"}, + {"/mobile", "Mmobile"}, + {"amqp:/_edge/router/sub", "Hrouter"}, + {"_edge/router/sub", "Hrouter"}, + {"_xnet/network/area/router/my-addr", "Nnetwork"}, + {"_xnet/network", "Nnetwork"}, + {"_xnet/my-network/area/router/my-addr", "Aarea"}, + {"_xnet/my-network/my-area/router/my-addr", "Rrouter"}, + {"_xnet/my-network/my-area/my-router/my-addr", "Lmy-addr"}, + {"_xedge/network/router/sub", "Nnetwork"}, + {"_xedge/my-network/router/sub", "Hrouter"}, + + // Re-run some of the above tests to make sure trailing dots are ignored. {"amqp:/_local/my-addr/sub.", "Lmy-addr/sub"}, {"amqp:/_local/my-addr.", "Lmy-addr"}, {"amqp:/_topo/area/router/local/sub.", "Aarea"}, @@ -404,6 +411,11 @@ static char* test_view_address_hash_edge(void *context) {"amqp:/_edge/edgerouter-2/sub", "Hedgerouter-2"}, {"_edge/edgerouter-3/sub", "Hedgerouter-3"}, {"_edge/edgerouter-4/sub", "L_edge"}, + {"_xedge/network/router/sub", "L_edge"}, + {"_xedge/my-network/router/sub", "L_edge"}, + {"_xedge/my-network/my-router/sub", "Lsub"}, + {"_xedge/my-network/edgerouter-1/sub", "Hedgerouter-1"}, + {"_xedge/my-network/edgerouter-4/sub", "L_edge"}, {0, 0} }; @@ -988,6 +1000,7 @@ int field_tests(void) char *test_group = "field_tests"; qd_iterator_set_address(false, "my-area", "my-router"); + qd_iterator_set_network("my-network"); TEST_CASE(test_view_global_dns, 0); TEST_CASE(test_view_global_non_dns, 0); @@ -1013,6 +1026,7 @@ int field_tests(void) TEST_CASE(test_iterator_copy_octet, 0); qd_iterator_set_address(true, "my-area", "my-router"); + qd_iterator_set_network("my-network"); TEST_CASE(test_view_address_hash_edge, 0); return result; diff --git a/tests/system_tests_cross_network.py b/tests/system_tests_cross_network.py new file mode 100644 index 000000000..70b2d53c8 --- /dev/null +++ b/tests/system_tests_cross_network.py @@ -0,0 +1,251 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message, symbol +from proton.handlers import MessagingHandler +from proton.reactor import Container, LinkOption + +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, TestTimeout +from system_test import unittest + + +class CrossNetworkOption(LinkOption): + def apply(self, link): + if link.is_receiver: + link.source.capabilities.put_object(symbol('qd.cross-network')) + + +class CrossNetworkPreconfiguredTest(TestCase): + """System tests involving a single router""" + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(CrossNetworkPreconfiguredTest, cls).setUpClass() + + def router(name, mode, netid, connection1, connection2=None, extra=None, args=None): + config = [ + ('router', {'mode': mode, 'id': name}), + ('network', {'networkId': netid}), + ('listener', {'port': cls.tester.get_port()}), + ('address', {'prefix': 'cl', 'distribution': 'closest'}), + connection1 + ] + + if connection2: + config.append(connection2) + if extra: + if extra.__class__ == list: + for e in extra: + config.append(e) + else: + config.append(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=args or [])) + + cls.routers = [] + + inter_router_port_X = cls.tester.get_port() + inter_router_port_Y = cls.tester.get_port() + cross_network_port = cls.tester.get_port() + edge_port_X = cls.tester.get_port() + edge_port_Y = cls.tester.get_port() + + router('NET.X.INT.A', 'interior', 'net-x', + ('listener', {'role': 'inter-router', 'port': inter_router_port_X, 'linkCapacity': '10'}), + ('listener', {'name': 'cross-net', 'role': 'inter-network', 'port': cross_network_port, 'linkCapacity': '10'}), + ('autoLink', {'connection': 'cross-net', 'direction': 'in', 'externalAddress': '_xnet/net-x'})) + router('NET.X.INT.B', 'interior', 'net-x', + ('connector', {'role': 'inter-router', 'port': inter_router_port_X, 'linkCapacity': '10'}), + ('listener', {'role': 'edge', 'port': edge_port_X, 'linkCapacity': '10'})) + router('EX', 'edge', 'net-x', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_X, 'linkCapacity': '10'})) + + router('NET.Y.INT.A', 'interior', 'net-y', + ('listener', {'role': 'inter-router', 'port': inter_router_port_Y, 'linkCapacity': '10'}), + ('connector', {'name': 'cross-net', 'role': 'inter-network', 'port': cross_network_port, 'linkCapacity': '10'}), + [('autoLink', {'connection': 'cross-net', 'direction': 'in', 'address': 'service01'}), + ('autoLink', {'connection': 'cross-net', 'direction': 'in', 'address': 'service02'}), + ('autoLink', {'connection': 'cross-net', 'direction': 'in', 'address': 'service03'}), + ('autoLink', {'connection': 'cross-net', 'direction': 'in', 'address': 'service04'}), + ('autoLink', {'connection': 'cross-net', 'direction': 'in', 'address': 'service05'}), + ('autoLink', {'connection': 'cross-net', 'direction': 'in', 'address': 'service06'}), + ('autoLink', {'connection': 'cross-net', 'direction': 'in', 'address': 'service07'}), + ('autoLink', {'connection': 'cross-net', 'direction': 'in', 'address': 'service08'}), + ('autoLink', {'connection': 'cross-net', 'direction': 'in', 'address': 'service09'})]) + router('NET.Y.INT.B', 'interior', 'net-y', + ('connector', {'role': 'inter-router', 'port': inter_router_port_Y, 'linkCapacity': '10'}), + ('listener', {'role': 'edge', 'port': edge_port_Y, 'linkCapacity': '10'})) + router('EY', 'edge', 'net-y', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_Y, 'linkCapacity': '10'})) + + cls.routers[0].wait_router_connected('NET.X.INT.B') + cls.routers[1].wait_router_connected('NET.X.INT.A') + + cls.routers[3].wait_router_connected('NET.Y.INT.B') + cls.routers[4].wait_router_connected('NET.Y.INT.A') + + cls.routers[1].is_edge_routers_connected(num_edges=1) + cls.routers[4].is_edge_routers_connected(num_edges=1) + + cls.routers[0].wait_address('EX', 0, 1, 1) + cls.routers[4].wait_address('net-x', 0, 1, 1) + + cls.xinta = cls.routers[0].addresses[0] + cls.xintb = cls.routers[1].addresses[0] + cls.ex = cls.routers[2].addresses[0] + cls.yinta = cls.routers[3].addresses[0] + cls.yintb = cls.routers[4].addresses[0] + cls.ey = cls.routers[5].addresses[0] + + #print("export XINTA=%s" % cls.xinta) + #print("export XINTB=%s" % cls.xintb) + #print("export YINTA=%s" % cls.yinta) + #print("export YINTB=%s" % cls.yintb) + #print("export EX=%s" % cls.ex) + #print("export EY=%s" % cls.ey) + + def _run(self, test): + test.run() + self.assertIsNone(test.error) + + def test_01_local_to_local(self): + self._run(ClientServerTest(self.xinta, self.yinta, self.yinta, 'service01')) + + def test_02_distant_to_local(self): + self._run(ClientServerTest(self.xintb, self.yinta, self.yinta, 'service02')) + + def test_03_local_to_distant(self): + self._run(ClientServerTest(self.xinta, self.yintb, self.yinta, 'service03')) + + def test_04_distant_to_distant(self): + self._run(ClientServerTest(self.xintb, self.yintb, self.yinta, 'service04')) + + def test_05_local_to_edge(self): + self._run(ClientServerTest(self.xinta, self.ey, self.yinta, 'service05')) + + def test_06_distant_to_edge(self): + self._run(ClientServerTest(self.xintb, self.ey, self.yinta, 'service06')) + + def test_07_edge_to_local(self): + self._run(ClientServerTest(self.ex, self.yinta, self.yinta, 'service07')) + + def test_08_edge_to_distant(self): + self._run(ClientServerTest(self.ex, self.yintb, self.yinta, 'service08')) + + def test_09_edge_to_edge(self): + self._run(ClientServerTest(self.ex, self.ey, self.yinta, 'service09')) + + +class ClientServerTest(MessagingHandler): + def __init__(self, client_address, server_address, probe_address, addr): + super(ClientServerTest, self).__init__() + self.client_address = client_address + self.server_address = server_address + self.probe_address = probe_address + self.addr = addr + self.error = None + self.client_sender = None + self.client_receiver = None + self.server_sender = None + self.server_receiver = None + self.probe_sender = None + self.timer = None + self.client_conn = None + self.server_conn = None + self.probe_conn = None + self.reply_to = None + self.sequence = 0 + self.n_client_sent = 0 + self.n_client_rcvd = 0 + self.n_server_sent = 0 + self.n_server_rcvd = 0 + self.n_attached = 0 + self.n_client_released = 0 + self.n_server_released = 0 + self.first_rel_sequence = None + self.last_rel_sequence = None + self.count = 25 + + def timeout(self): + self.error = "Timeout Expired - client_sent=%d, server_rcvd=%d, server_sent=%d, client_rcvd=%d, attached=%d, client_released=%d, server_released=%d, first_rel=%r, last_rel=%r, reply_to=%s" % \ + (self.n_client_sent, self.n_server_rcvd, self.n_server_sent, self.n_client_rcvd, self.n_attached, self.n_client_released, self.n_server_released, self.first_rel_sequence, self.last_rel_sequence, self.reply_to) + self.client_conn.close() + self.server_conn.close() + self.probe_conn.close() + + def fail(self, cause): + self.error = cause + self.client_conn.close() + self.server_conn.close() + self.probe_conn.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) + self.client_conn = event.container.connect(self.client_address) + self.server_conn = event.container.connect(self.server_address) + self.probe_conn = event.container.connect(self.probe_address) + self.client_receiver = event.container.create_receiver(self.client_conn, None, dynamic=True, options=CrossNetworkOption()) + self.server_sender = event.container.create_sender(self.server_conn, None) + self.server_receiver = event.container.create_receiver(self.server_conn, self.addr) + + def on_link_opened(self, event): + self.n_attached += 1 + if event.receiver == self.client_receiver: + self.reply_to = event.receiver.remote_source.address + if self.n_attached == 3: + self.probe_sender = event.container.create_sender(self.probe_conn, self.addr) + + def on_sendable(self, event): + if event.sender == self.probe_sender and not self.client_sender: + self.client_sender = event.container.create_sender(self.client_conn, self.addr) + elif event.sender == self.client_sender: + while self.n_client_sent < self.count and self.client_sender.credit > 0: + delivery = self.client_sender.send(Message(address=self.addr, reply_to=self.reply_to, body={'sequence': self.sequence})) + delivery._sequence = self.sequence + self.n_client_sent += 1 + self.sequence += 1 + + def on_message(self, event): + if event.receiver == self.server_receiver: + self.n_server_rcvd += 1 + msg = event.message + delivery = self.server_sender.send(Message(address=msg.reply_to, body=msg.body)) + delivery._sequence = msg.body['sequence'] + self.n_server_sent += 1 + elif event.receiver == self.client_receiver: + self.n_client_rcvd += 1 + if self.n_client_rcvd == self.count: + self.fail(None) + + def on_released(self, event): + if event.sender == self.client_sender: + self.n_client_released += 1 + # Client releases are ok because they are caused by normal propagation delays of the mobile service address + self.n_client_sent -= 1 + if not self.first_rel_sequence: + self.first_rel_sequence = event.delivery._sequence + self.last_rel_sequence = event.delivery._sequence + elif event.sender == self.server_sender: + self.n_server_released += 1 + + def run(self): + Container(self).run() + + +if __name__ == '__main__': + unittest.main(main_module()) diff --git a/tools/skstat.in b/tools/skstat.in index 182e12e12..0bbb43f7e 100755 --- a/tools/skstat.in +++ b/tools/skstat.in @@ -278,6 +278,8 @@ class BusManager: return "" if addr[0] == 'M' : return "mobile" + if addr[0] == 'N' : + return "network" if addr[0] == 'R' : return "router" if addr[0] == 'A' : @@ -286,10 +288,6 @@ class BusManager: return "local" if addr[0] == 'T' : return "topo" - if addr[0] in 'CE' : - return "link-in" - if addr[0] in 'DF' : - return "link-out" if addr[0] == 'H' : return "edge" return "unknown: %s" % addr[0] @@ -406,6 +404,8 @@ class BusManager: rows.append(('Transit Count', PlainNum(router_metrics.deliveriesTransit))) rows.append(('Deliveries from Route Container', PlainNum(router_metrics.deliveriesIngressRouteContainer))) rows.append(('Deliveries to Route Container', PlainNum(router_metrics.deliveriesEgressRouteContainer))) + rows.append(('Deliveries from Remote Network', PlainNum(router_metrics.deliveriesIngressInterNetwork))) + rows.append(('Deliveries to Remote Network', PlainNum(router_metrics.deliveriesEgressInterNetwork))) except: pass