Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions etc/skrouterd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

router {
mode: standalone
transportPlugin: reference
}

listener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
* under the License.
*/

#include "entity.h"

#include "qpid/dispatch/entity.h"
#include "qpid/dispatch/alloc_pool.h"
#include "qpid/dispatch/atomic.h"
#include "qpid/dispatch/dispatch.h"
Expand Down Expand Up @@ -95,4 +94,17 @@ void qd_set_condition_on_vflow(pn_raw_connection_t *raw_conn, vflow_record_t *vf

qd_observer_t get_listener_observer(const char *observer);

typedef void* (*configure_entity_t) (qd_dispatch_t *qd, qd_entity_t *entity);
typedef void* (*update_entity_t) (qd_dispatch_t *qd, qd_entity_t *entity, void *impl);
typedef void (*delete_entity_t) (qd_dispatch_t *qd, void *impl);
typedef qd_error_t (*refresh_entity_t) (qd_entity_t* entity, void *impl);

void qd_register_tcp_management_handlers(configure_entity_t configure_tcp_listener,
configure_entity_t configure_tcp_connector,
update_entity_t update_tcp_listener,
delete_entity_t delete_tcp_listener,
delete_entity_t delete_tcp_connector,
refresh_entity_t refresh_tcp_listener,
refresh_entity_t refresh_tcp_connector);

#endif // __adaptor_common_h__
File renamed without changes.
19 changes: 19 additions & 0 deletions include/qpid/dispatch/io_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,24 @@ void qdr_register_adaptor(const char *name,
qdr_adaptor_final_t on_final,
uint32_t ordinal);

/**
* Declaration of a protocol adaptor
*
* A protocol adaptor may declare itself by invoking the QDR_CORE_ADAPTOR_DECLARE macro in its body.
*
* @param name A null-terminated literal string naming the module
* @param on_init Pointer to a function for adaptor initialization, called at core thread startup
* @param on_final Pointer to a function for adaptor finalization, called at core thread shutdown
*/
#define QDR_TRANSPORT_MODULE_DECLARE(name, on_init, on_final) \
static void transportstart(void) __attribute__((constructor)); \
void transportstart(void) \
{ \
qdr_register_transport_module(name, on_init, on_final); \
}
void qdr_register_transport_module(const char *name,
qdr_adaptor_init_t on_init,
qdr_adaptor_final_t on_final);


#endif
24 changes: 24 additions & 0 deletions include/qpid/dispatch/service_transport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#ifndef __service_transport_h__
#define __service_transport_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

void st_post_routing_query(const char* routing_key, void* reply_handle);

#endif
6 changes: 6 additions & 0 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,12 @@
"description": "Terminate corresponding TCP connections when a tcpListener or a tcpConnector is deleted.",
"required": false,
"create": true
},
"transportPlugin": {
"type": "string",
"description": "If supplied, this is the name of the service-transport plugin to use for service traffic",
"required": false,
"create": true
}
}
},
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ set(qpid_dispatch_SOURCES
observers/http2/http2_observer.c
decoders/http1/http1_decoder.c
decoders/http2/http2_decoder.c
transport_modules/reference.c
alloc.c
alloc_pool.c
aprintf.c
Expand Down Expand Up @@ -88,6 +89,7 @@ set(qpid_dispatch_SOURCES
python_embedded.c
router_agent.c
router_config.c
service_transport.c
platform.c
vanflow.c
router.c
Expand Down
88 changes: 87 additions & 1 deletion src/adaptors/adaptor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
#include "adaptor_common.h"
#include "qpid/dispatch/adaptor_common.h"
#include "tcp/tcp_adaptor.h"

#include "qpid/dispatch/amqp.h"
Expand Down Expand Up @@ -156,3 +156,89 @@ void qd_set_vflow_netaddr_string(vflow_record_t *vflow, pn_raw_connection_t *pn_
vflow_set_string(vflow, ingress ? VFLOW_ATTRIBUTE_SOURCE_PORT : VFLOW_ATTRIBUTE_PROXY_PORT, remote_port);
}
}

static struct {
bool configured;
configure_entity_t configure_tcp_listener;
configure_entity_t configure_tcp_connector;
update_entity_t update_tcp_listener;
delete_entity_t delete_tcp_listener;
delete_entity_t delete_tcp_connector;
refresh_entity_t refresh_tcp_listener;
refresh_entity_t refresh_tcp_connector;
} management_calls = {false,0,0,0,0,0,0,0};

void qd_register_tcp_management_handlers(configure_entity_t configure_tcp_listener,
configure_entity_t configure_tcp_connector,
update_entity_t update_tcp_listener,
delete_entity_t delete_tcp_listener,
delete_entity_t delete_tcp_connector,
refresh_entity_t refresh_tcp_listener,
refresh_entity_t refresh_tcp_connector)
{
assert(!management_calls.configured);
management_calls.configured = true;
management_calls.configure_tcp_listener = configure_tcp_listener;
management_calls.configure_tcp_connector = configure_tcp_connector;
management_calls.update_tcp_listener = update_tcp_listener;
management_calls.delete_tcp_listener = delete_tcp_listener;
management_calls.delete_tcp_connector = delete_tcp_connector;
management_calls.refresh_tcp_listener = refresh_tcp_listener;
management_calls.refresh_tcp_connector = refresh_tcp_connector;
}

QD_EXPORT void *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity)
{
if (management_calls.configured) {
return management_calls.configure_tcp_listener(qd, entity);
}
return 0;
}

QD_EXPORT void *qd_dispatch_update_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity, void *impl)
{
if (management_calls.configured) {
return management_calls.update_tcp_listener(qd, entity, impl);
}
return 0;
}

QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
{
if (management_calls.configured) {
management_calls.delete_tcp_listener(qd, impl);
}
}

QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
{
if (management_calls.configured) {
management_calls.delete_tcp_connector(qd, impl);
}
}

QD_EXPORT qd_error_t qd_entity_refresh_tcpListener(qd_entity_t* entity, void *impl)
{
if (management_calls.configured) {
return management_calls.refresh_tcp_listener(entity, impl);
}
return QD_ERROR_NONE;
}

QD_EXPORT void *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
if (management_calls.configured) {
return management_calls.configure_tcp_connector(qd, entity);
}
return 0;
}

QD_EXPORT qd_error_t qd_entity_refresh_tcpConnector(qd_entity_t* entity, void *impl)
{
if (management_calls.configured) {
return management_calls.refresh_tcp_connector(entity, impl);
}
return QD_ERROR_NONE;
}


1 change: 0 additions & 1 deletion src/adaptors/adaptor_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/

#include "adaptor_listener.h"
#include "adaptor_common.h"
#include "server_private.h"

#include <qpid/dispatch/protocol_adaptor.h>
Expand Down
3 changes: 1 addition & 2 deletions src/adaptors/adaptor_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
//

#include <qpid/dispatch/protocol_adaptor.h>

#include "adaptor_common.h"
#include <qpid/dispatch/adaptor_common.h>

typedef struct qd_adaptor_listener_t qd_adaptor_listener_t;

Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/amqp/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "qd_connector.h"
#include "server_config.h"
#include "dispatch_private.h"
#include "entity.h"
#include "qpid/dispatch/entity.h"

#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/failoverlist.h"
Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/amqp/qd_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "qd_connector.h"
#include "qd_connection.h"
#include "private.h"
#include "entity.h"
#include "qpid/dispatch/entity.h"

#include "qpid/dispatch/alloc_pool.h"
#include "qpid/dispatch/timer.h"
Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/amqp/qd_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "private.h"
#include "qd_connection.h"
#include "http.h"
#include "entity.h"
#include "qpid/dispatch/entity.h"

#include "qpid/dispatch/server.h"
#include "qpid/dispatch/log.h"
Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/amqp/server_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "container.h"
#include "server_config.h"
#include "dispatch_private.h"
#include "entity.h"
#include "qpid/dispatch/entity.h"

#include <qpid/dispatch/amqp_adaptor.h>
#include <qpid/dispatch/log.h>
Expand Down
43 changes: 29 additions & 14 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -2361,7 +2361,7 @@ static void CORE_connection_trace(void *context, qdr_connection_t *conn, bool tr
// Entrypoints for Management
//=================================================================================

QD_EXPORT void *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity)
void *configure_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity)
{
SET_THREAD_UNKNOWN;
qd_tcp_listener_t *listener = new_qd_tcp_listener_t();
Expand Down Expand Up @@ -2414,7 +2414,7 @@ QD_EXPORT void *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_
/**
* Handles tcpListener record update request from management agent.
*/
QD_EXPORT void *qd_dispatch_update_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity, void *impl)
void *update_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity, void *impl)
{
SET_THREAD_UNKNOWN;
qd_error_clear();
Expand All @@ -2441,8 +2441,7 @@ QD_EXPORT void *qd_dispatch_update_tcp_listener(qd_dispatch_t *qd, qd_entity_t *
return listener;
}


QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
void delete_tcp_listener(qd_dispatch_t *qd, void *impl)
{

SET_THREAD_UNKNOWN;
Expand Down Expand Up @@ -2490,8 +2489,7 @@ QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
}
}


QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
void delete_tcp_connector(qd_dispatch_t *qd, void *impl)
{
SET_THREAD_UNKNOWN;
qd_tcp_connector_t *connector = (qd_tcp_connector_t*) impl;
Expand Down Expand Up @@ -2537,8 +2535,7 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
}
}


QD_EXPORT qd_error_t qd_entity_refresh_tcpListener(qd_entity_t* entity, void *impl)
qd_error_t refresh_tcp_listener(qd_entity_t* entity, void *impl)
{
SET_THREAD_UNKNOWN;
uint64_t co = 0;
Expand Down Expand Up @@ -2566,8 +2563,7 @@ QD_EXPORT qd_error_t qd_entity_refresh_tcpListener(qd_entity_t* entity, void *im
return qd_error_code();
}


qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, qd_entity_t *entity)
void *configure_tcp_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
SET_THREAD_UNKNOWN;
qd_tcp_connector_t *connector = new_qd_tcp_connector_t();
Expand Down Expand Up @@ -2617,8 +2613,7 @@ qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, qd_en
return connector;
}


QD_EXPORT qd_error_t qd_entity_refresh_tcpConnector(qd_entity_t* entity, void *impl)
qd_error_t refresh_tcp_connector(qd_entity_t* entity, void *impl)
{
SET_THREAD_UNKNOWN;
qd_tcp_connector_t *cr = (qd_tcp_connector_t*) impl;
Expand All @@ -2642,10 +2637,26 @@ QD_EXPORT qd_error_t qd_entity_refresh_tcpConnector(qd_entity_t* entity, void *i
//=================================================================================
// Interface to Protocol Adaptor registration
//=================================================================================
static void ADAPTOR_init(qdr_core_t *core, void **adaptor_context)
static void TRANSPORT_init(qdr_core_t *core, void **adaptor_context)
{
SET_THREAD_UNKNOWN;
const char *chosen = qdr_core_dispatch(core)->transport_plugin;
if (!!chosen) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO, "Initialization suppressed. There's an alternate service transport enabled.");
*adaptor_context = 0;
return;
}

qd_register_tcp_management_handlers(configure_tcp_listener,
configure_tcp_connector,
update_tcp_listener,
delete_tcp_listener,
delete_tcp_connector,
refresh_tcp_listener,
refresh_tcp_connector);

tcp_context = NEW(qd_tcp_context_t);
*adaptor_context = tcp_context;
ZERO(tcp_context);

tcp_context->core = core;
Expand Down Expand Up @@ -2703,6 +2714,10 @@ static void ADAPTOR_init(qdr_core_t *core, void **adaptor_context)
static void ADAPTOR_final(void *adaptor_context)
{
SET_THREAD_UNKNOWN;
if (!adaptor_context) {
return;
}

qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO, "Shutting down TCP protocol adaptor");
while (DEQ_HEAD(tcp_context->listeners)) {
qd_tcp_listener_t *listener = DEQ_HEAD(tcp_context->listeners);
Expand Down Expand Up @@ -2746,4 +2761,4 @@ static void ADAPTOR_final(void *adaptor_context)
/**
* Declare the adaptor so that it will self-register on process startup.
*/
QDR_CORE_ADAPTOR_DECLARE("tcp", ADAPTOR_init, ADAPTOR_final)
QDR_CORE_ADAPTOR_DECLARE("tcp", TRANSPORT_init, ADAPTOR_final)
Loading
Loading