diff --git a/charts/fsm/components/scripts.tar.gz b/charts/fsm/components/scripts.tar.gz index 55266cb80..48d390b55 100644 Binary files a/charts/fsm/components/scripts.tar.gz and b/charts/fsm/components/scripts.tar.gz differ diff --git a/charts/fsm/components/scripts/gateways/filters/http/TranscodeDubbo.js b/charts/fsm/components/scripts/gateways/filters/http/TranscodeDubbo.js new file mode 100644 index 000000000..e87f5749c --- /dev/null +++ b/charts/fsm/components/scripts/gateways/filters/http/TranscodeDubbo.js @@ -0,0 +1,42 @@ +export default function (config) { + var conf = config.transcodeDubbo + var version = conf.version || '' + var service = conf.service || '' + var method = conf.method || '' + var signature = conf.signature || '' + + var $ctx + var requestID = 0 + + var pl = pipeline($=>$ + .onStart(c => { $ctx = c }) + .replaceMessage( + req => { + var body = req.body + var json = body.size > 0 ? JSON.decode(req.body) : [] + var params = (json instanceof Array ? json : [json]) + return new Message( + { + requestID: ++requestID, + isRequest: true, + isTwoWay: true, + serializationType: 2, + }, + Hessian.encode([ + '2.0.2', service, version, method, signature, ...params, null + ] + )) + } + ) + .pipeNext() + .replaceMessage( + res => { + var results = Hessian.decode(res.body) + return new Message(JSON.encode(results)) + } + ) + ) + + pl.meta = { outputProtocol: 'dubbo' } + return pl +} diff --git a/charts/fsm/components/scripts/gateways/filters/udp/DNSModifier.js b/charts/fsm/components/scripts/gateways/filters/udp/DNSModifier.js index b4cade587..adcd82405 100644 --- a/charts/fsm/components/scripts/gateways/filters/udp/DNSModifier.js +++ b/charts/fsm/components/scripts/gateways/filters/udp/DNSModifier.js @@ -33,7 +33,7 @@ export default function (config) { return new Message(data) } }) - .demux().to($=>$ + .demuxQueue().to($=>$ .replaceMessage( msg => { $question = DNS.decode(msg.body) diff --git a/charts/fsm/components/scripts/gateways/main.js b/charts/fsm/components/scripts/gateways/main.js index 0a129f0fc..e7a644a65 100755 --- a/charts/fsm/components/scripts/gateways/main.js +++ b/charts/fsm/components/scripts/gateways/main.js @@ -28,3 +28,5 @@ resources.list('Gateway').forEach(gw => { startGateway(gw) } }) + +console.info('FGW started') diff --git a/charts/fsm/components/scripts/gateways/modules/backend-selector.js b/charts/fsm/components/scripts/gateways/modules/backend-selector.js index d85677985..b3b10c1bf 100644 --- a/charts/fsm/components/scripts/gateways/modules/backend-selector.js +++ b/charts/fsm/components/scripts/gateways/modules/backend-selector.js @@ -8,15 +8,13 @@ var listenerFilterCaches = new algo.Cache( ) export default function (protocol, listener, rule, makeBalancer) { - var ruleFilters = [ - ...listenerFilterCaches.get(protocol).get(listener), - ...makeFilters(protocol, rule?.filters), - ] + var routeFilters = listenerFilterCaches.get(protocol).get(listener) + var ruleFilters = makeFilters(routeFilters.outputProtocol || protocol, rule?.filters) var refs = rule?.backendRefs || [] if (refs.length > 1) { var lb = new algo.LoadBalancer( - refs.map(ref => makeBackendTarget(ruleFilters, ref)), + refs.map(ref => makeBackendTarget(ref)), { key: t => t.id, weight: t => t.weight, @@ -24,22 +22,24 @@ export default function (protocol, listener, rule, makeBalancer) { ) return (hint) => lb.allocate(hint) } else { - var singleSelection = { target: makeBackendTarget(ruleFilters, refs[0]) } + var singleSelection = { target: makeBackendTarget(refs[0]) } return () => singleSelection } - function makeBackendTarget(ruleFilters, backendRef) { + function makeBackendTarget(backendRef) { var backendResource = findBackendResource(backendRef) + var backendFilters = makeFilters(ruleFilters.outputProtocol || protocol, backendRef?.filters) var filters = [ + ...routeFilters, ...ruleFilters, - ...makeFilters(protocol, backendRef?.filters), + ...backendFilters, ] return { id: backendRef?.name, backendRef, backendResource, weight: backendRef?.weight || 1, - pipeline: makeBalancer(backendRef, backendResource, filters) + pipeline: makeBalancer(backendRef, backendResource, filters, backendFilters.outputProtocol || protocol) } } diff --git a/charts/fsm/components/scripts/gateways/modules/backend.js b/charts/fsm/components/scripts/gateways/modules/backend.js index 17010741d..28deba938 100644 --- a/charts/fsm/components/scripts/gateways/modules/backend.js +++ b/charts/fsm/components/scripts/gateways/modules/backend.js @@ -94,7 +94,7 @@ var cache = new algo.Cache( }) } else { $.pipe(() => $protocol, { - 'tcp': $=>$.connect(() => $target.address), + 'tcp': $=>$.connect(() => $target.address, { idleTimeout: 0 }), 'udp': $=>$.connect(() => $target.address, { protocol: 'udp' }), }) } diff --git a/charts/fsm/components/scripts/gateways/modules/balancer-dubbo.js b/charts/fsm/components/scripts/gateways/modules/balancer-dubbo.js index dd6239be3..755f1731f 100644 --- a/charts/fsm/components/scripts/gateways/modules/balancer-dubbo.js +++ b/charts/fsm/components/scripts/gateways/modules/balancer-dubbo.js @@ -63,7 +63,7 @@ export default function (backendRef, backendResource) { target: $session.target, } }) - $.mux(() => $session).to($=>$ + $.muxQueue(() => $session).to($=>$ .encodeDubbo() .pipe(backend.connect, () => $conn) .decodeDubbo() diff --git a/charts/fsm/components/scripts/gateways/modules/balancer.js b/charts/fsm/components/scripts/gateways/modules/balancer.js new file mode 100644 index 000000000..3c9db21c3 --- /dev/null +++ b/charts/fsm/components/scripts/gateways/modules/balancer.js @@ -0,0 +1,14 @@ +import makeHTTP from './balancer-http.js' +import makeTCP from './balancer-tcp.js' +import makeUDP from './balancer-udp.js' +import makeDubbo from './balancer-dubbo.js' + +export default function (protocol, backendRef, backendResource, options) { + switch (protocol) { + case 'http': return makeHTTP(backendRef, backendResource, options?.gateway, Boolean(options?.isHTTP2)) + case 'tcp': return makeTCP(backendRef, backendResource, options?.gateway) + case 'udp': return makeUDP(backendRef, backendResource) + case 'dubbo': return makeDubbo(backendRef, backendResource) + default: throw `Invalid protocol '${protocol}' for balancer` + } +} diff --git a/charts/fsm/components/scripts/gateways/modules/router-dubbo.js b/charts/fsm/components/scripts/gateways/modules/router-dubbo.js index dc10d6f11..a4fac6072 100644 --- a/charts/fsm/components/scripts/gateways/modules/router-dubbo.js +++ b/charts/fsm/components/scripts/gateways/modules/router-dubbo.js @@ -1,6 +1,6 @@ import resources from '../resources.js' import makeBackendSelector from './backend-selector.js' -import makeBalancer from './balancer-dubbo.js' +import makeBalancer from './balancer.js' import { log } from '../utils.js' var response404 = pipeline($=>$.replaceMessage(new Message({ status: 404 }))) @@ -84,7 +84,7 @@ export default function (routerKey, listener, routeResources) { var handleStream = pipeline($=>$ .decodeDubbo() - .demux().to(handleRequest) + .demuxQueue().to(handleRequest) .encodeDubbo() ) @@ -95,10 +95,17 @@ export default function (routerKey, listener, routeResources) { } function makeRouter(listener, routeResources) { + var cache = new algo.Cache({ ttl: 3600 }) var selector = makeRuleSelector(routeResources) return function (head, body) { - $selection = selector(body) + var key = body.slice(1,5).join(' ') + var val = cache.get(key) + if (val) { + $selection = val + } else { + cache.set(key, $selection = selector(body)) + } log?.( `Inb #${$ctx.parent.inbound.id} Req #${$ctx.parent.messageCount+1}`, head.requestID, `backend ${$selection?.target?.backendRef?.name}`, @@ -192,10 +199,9 @@ function makeRouter(listener, routeResources) { function makeBackendSelectorForRule(rule) { var selector = makeBackendSelector( 'dubbo', listener, rule, - - function (backendRef, backendResource, filters) { + function (backendRef, backendResource, filters, protocol) { if (!backendResource && filters.length === 0) return response500 - var forwarder = backendResource ? [makeBalancer(backendRef, backendResource)] : [] + var forwarder = backendResource ? [makeBalancer(protocol, backendRef, backendResource)] : [] return pipeline($=>$ .pipe([...filters, ...forwarder], () => $ctx) .onEnd(() => $selection.free?.()) diff --git a/charts/fsm/components/scripts/gateways/modules/router-http.js b/charts/fsm/components/scripts/gateways/modules/router-http.js index 838cd4a22..50cc9c661 100644 --- a/charts/fsm/components/scripts/gateways/modules/router-http.js +++ b/charts/fsm/components/scripts/gateways/modules/router-http.js @@ -1,6 +1,6 @@ import resources from '../resources.js' import makeBackendSelector from './backend-selector.js' -import makeBalancer from './balancer-http.js' +import makeBalancer from './balancer.js' import makeSessionPersistence from './session-persistence.js' import { log, stringifyHTTPHeaders } from '../utils.js' @@ -377,12 +377,14 @@ function makeRouter(listener, routeResources, gateway) { var selector = makeBackendSelector( 'http', listener, rule, - function (backendRef, backendResource, filters) { + function (backendRef, backendResource, filters, protocol) { if (!backendResource && filters.length === 0) return response500 - var forwarder = backendResource ? [makeBalancer(backendRef, backendResource, gateway, isHTTP2)] : [] + var forwarder = backendResource ? [makeBalancer(protocol, backendRef, backendResource, { gateway, isHTTP2 })] : [] - if (retryPipeline) forwarder.unshift(retryPipeline) - if (timeoutPipeline) forwarder.unshift(timeoutPipeline) + if (protocol === 'http') { + if (retryPipeline) forwarder.unshift(retryPipeline) + if (timeoutPipeline) forwarder.unshift(timeoutPipeline) + } if (sessionPersistence) { var preserveSession = sessionPersistence.preserve diff --git a/charts/fsm/components/scripts/gateways/modules/router-tcp.js b/charts/fsm/components/scripts/gateways/modules/router-tcp.js index c04f8ecbb..01f88e46d 100644 --- a/charts/fsm/components/scripts/gateways/modules/router-tcp.js +++ b/charts/fsm/components/scripts/gateways/modules/router-tcp.js @@ -1,6 +1,6 @@ import resources from '../resources.js' import makeBackendSelector from './backend-selector.js' -import makeBalancer from './balancer-tcp.js' +import makeBalancer from './balancer.js' import { log } from '../utils.js' var shutdown = pipeline($=>$.replaceStreamStart(new StreamEnd)) @@ -35,8 +35,8 @@ function makeRouter(listener, routeResources, gateway) { var selector = makeBackendSelector( 'tcp', listener, routeResources[0]?.spec?.rules?.[0], - function (backendRef, backendResource, filters) { - var forwarder = backendResource ? makeBalancer(backendRef, backendResource, gateway) : shutdown + function (backendRef, backendResource, filters, protocol) { + var forwarder = backendResource ? makeBalancer(protocol, backendRef, backendResource, { gateway }) : shutdown return pipeline($=>$ .pipe([...filters, forwarder], () => $ctx) .onEnd(() => $selection.free?.()) diff --git a/charts/fsm/components/scripts/gateways/modules/router-tls.js b/charts/fsm/components/scripts/gateways/modules/router-tls.js index 20d2fa2fd..4916e28ac 100644 --- a/charts/fsm/components/scripts/gateways/modules/router-tls.js +++ b/charts/fsm/components/scripts/gateways/modules/router-tls.js @@ -1,6 +1,6 @@ import resources from '../resources.js' import makeBackendSelector from './backend-selector.js' -import makeBalancer from './balancer-tcp.js' +import makeBalancer from './balancer.js' import { log } from '../utils.js' var shutdown = pipeline($=>$.replaceStreamStart(new StreamEnd)) @@ -56,8 +56,8 @@ function makeRouter(listener, routeResources, gateway) { hostnames.forEach(name => { var selector = makeBackendSelector( 'tcp', listener, r.spec.rules?.[0], - function (backendRef, backendResource, filters) { - var forwarder = backendResource ? makeBalancer(backendRef, backendResource, gateway) : shutdown + function (backendRef, backendResource, filters, protocol) { + var forwarder = backendResource ? makeBalancer(protocol, backendRef, backendResource, { gateway }) : shutdown return pipeline($=>$ .pipe([...filters, forwarder], () => $ctx) .onEnd(() => $selection.free?.()) diff --git a/charts/fsm/components/scripts/gateways/modules/router-udp.js b/charts/fsm/components/scripts/gateways/modules/router-udp.js index 650e6bf66..21a944c07 100644 --- a/charts/fsm/components/scripts/gateways/modules/router-udp.js +++ b/charts/fsm/components/scripts/gateways/modules/router-udp.js @@ -1,6 +1,6 @@ import resources from '../resources.js' import makeBackendSelector from './backend-selector.js' -import makeBalancer from './balancer-udp.js' +import makeBalancer from './balancer.js' import { log } from '../utils.js' var shutdown = pipeline($=>$.replaceStreamStart(new StreamEnd)) @@ -35,8 +35,8 @@ function makeRouter(listener, routeResources) { var selector = makeBackendSelector( 'udp', listener, routeResources[0]?.spec?.rules?.[0], - function (backendRef, backendResource, filters) { - var forwarder = backendResource ? makeBalancer(backendRef, backendResource) : shutdown + function (backendRef, backendResource, filters, protocol) { + var forwarder = backendResource ? makeBalancer(protocol, backendRef, backendResource) : shutdown return pipeline($=>$ .pipe([...filters, forwarder], () => $ctx) .onEnd(() => $selection.free?.()) diff --git a/charts/fsm/components/scripts/gateways/modules/session-persistence.js b/charts/fsm/components/scripts/gateways/modules/session-persistence.js index e4ebf65fa..e9f382c16 100644 --- a/charts/fsm/components/scripts/gateways/modules/session-persistence.js +++ b/charts/fsm/components/scripts/gateways/modules/session-persistence.js @@ -56,9 +56,9 @@ export default function (sessionPersistence) { } function makeHeaderSessionKeyGetter() { - var headerName = sessionPresistance.sessionName + var headerName = sessionPersistence.sessionName return (head) => head.headers[headerName] } return { restore, preserve } -} +} \ No newline at end of file diff --git a/charts/fsm/components/scripts/gateways/startup.js b/charts/fsm/components/scripts/gateways/startup.js index 1f4bbb94a..4e6e5a197 100644 --- a/charts/fsm/components/scripts/gateways/startup.js +++ b/charts/fsm/components/scripts/gateways/startup.js @@ -79,7 +79,7 @@ function makeListener(gateway, listener) { var $ctx - pipy.listen(port, wireProto, $=>$ + pipy.listen(port, wireProto, { idleTimeout: 0 }, $=>$ .onStart(i => { $ctx = { inbound: i, diff --git a/charts/fsm/components/scripts/gateways/utils.js b/charts/fsm/components/scripts/gateways/utils.js index d6be230bd..eb67b685a 100644 --- a/charts/fsm/components/scripts/gateways/utils.js +++ b/charts/fsm/components/scripts/gateways/utils.js @@ -62,8 +62,8 @@ export function findPolicies(kind, targetResource) { } export function makeFilters(protocol, filters) { - if (!filters) return [] - return filters.map( + filters = filters || [] + var pipelines = filters.map( config => { var maker = ( importFilter(`./config/filters/${protocol}/${config.type}.js`) || @@ -71,9 +71,13 @@ export function makeFilters(protocol, filters) { ) if (!maker) throw `${protocol} filter not found: ${config.type}` if (typeof maker !== 'function') throw `filter ${config.type} is not a function` - return maker(config, resources) + var pl = maker(config, resources) + protocol = pl?.meta?.outputProtocol || protocol + return pl } ) + pipelines.outputProtocol = protocol + return pipelines } function importFilter(pathname) {