Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified charts/fsm/components/scripts.tar.gz
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
export default function (config) {
var conf = config.transcodeDubbo
var version = conf.version || ''
var service = conf.service || ''
var method = conf.method || ''
var signature = conf.signature || ''

var $ctx
var requestID = 0

var pl = pipeline($=>$
.onStart(c => { $ctx = c })
.replaceMessage(
req => {
var body = req.body
var json = body.size > 0 ? JSON.decode(req.body) : []
var params = (json instanceof Array ? json : [json])
return new Message(
{
requestID: ++requestID,
isRequest: true,
isTwoWay: true,
serializationType: 2,
},
Hessian.encode([
'2.0.2', service, version, method, signature, ...params, null
]
))
}
)
.pipeNext()
.replaceMessage(
res => {
var results = Hessian.decode(res.body)
return new Message(JSON.encode(results))
}
)
)

pl.meta = { outputProtocol: 'dubbo' }
return pl
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export default function (config) {
return new Message(data)
}
})
.demux().to($=>$
.demuxQueue().to($=>$
.replaceMessage(
msg => {
$question = DNS.decode(msg.body)
Expand Down
2 changes: 2 additions & 0 deletions charts/fsm/components/scripts/gateways/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ resources.list('Gateway').forEach(gw => {
startGateway(gw)
}
})

console.info('FGW started')
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,38 @@ var listenerFilterCaches = new algo.Cache(
)

export default function (protocol, listener, rule, makeBalancer) {
var ruleFilters = [
...listenerFilterCaches.get(protocol).get(listener),
...makeFilters(protocol, rule?.filters),
]
var routeFilters = listenerFilterCaches.get(protocol).get(listener)
var ruleFilters = makeFilters(routeFilters.outputProtocol || protocol, rule?.filters)

var refs = rule?.backendRefs || []
if (refs.length > 1) {
var lb = new algo.LoadBalancer(
refs.map(ref => makeBackendTarget(ruleFilters, ref)),
refs.map(ref => makeBackendTarget(ref)),
{
key: t => t.id,
weight: t => t.weight,
}
)
return (hint) => lb.allocate(hint)
} else {
var singleSelection = { target: makeBackendTarget(ruleFilters, refs[0]) }
var singleSelection = { target: makeBackendTarget(refs[0]) }
return () => singleSelection
}

function makeBackendTarget(ruleFilters, backendRef) {
function makeBackendTarget(backendRef) {
var backendResource = findBackendResource(backendRef)
var backendFilters = makeFilters(ruleFilters.outputProtocol || protocol, backendRef?.filters)
var filters = [
...routeFilters,
...ruleFilters,
...makeFilters(protocol, backendRef?.filters),
...backendFilters,
]
return {
id: backendRef?.name,
backendRef,
backendResource,
weight: backendRef?.weight || 1,
pipeline: makeBalancer(backendRef, backendResource, filters)
pipeline: makeBalancer(backendRef, backendResource, filters, backendFilters.outputProtocol || protocol)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ var cache = new algo.Cache(
})
} else {
$.pipe(() => $protocol, {
'tcp': $=>$.connect(() => $target.address),
'tcp': $=>$.connect(() => $target.address, { idleTimeout: 0 }),
'udp': $=>$.connect(() => $target.address, { protocol: 'udp' }),
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export default function (backendRef, backendResource) {
target: $session.target,
}
})
$.mux(() => $session).to($=>$
$.muxQueue(() => $session).to($=>$
.encodeDubbo()
.pipe(backend.connect, () => $conn)
.decodeDubbo()
Expand Down
14 changes: 14 additions & 0 deletions charts/fsm/components/scripts/gateways/modules/balancer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import makeHTTP from './balancer-http.js'
import makeTCP from './balancer-tcp.js'
import makeUDP from './balancer-udp.js'
import makeDubbo from './balancer-dubbo.js'

export default function (protocol, backendRef, backendResource, options) {
switch (protocol) {
case 'http': return makeHTTP(backendRef, backendResource, options?.gateway, Boolean(options?.isHTTP2))
case 'tcp': return makeTCP(backendRef, backendResource, options?.gateway)
case 'udp': return makeUDP(backendRef, backendResource)
case 'dubbo': return makeDubbo(backendRef, backendResource)
default: throw `Invalid protocol '${protocol}' for balancer`
}
}
18 changes: 12 additions & 6 deletions charts/fsm/components/scripts/gateways/modules/router-dubbo.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import resources from '../resources.js'
import makeBackendSelector from './backend-selector.js'
import makeBalancer from './balancer-dubbo.js'
import makeBalancer from './balancer.js'
import { log } from '../utils.js'

var response404 = pipeline($=>$.replaceMessage(new Message({ status: 404 })))
Expand Down Expand Up @@ -84,7 +84,7 @@ export default function (routerKey, listener, routeResources) {

var handleStream = pipeline($=>$
.decodeDubbo()
.demux().to(handleRequest)
.demuxQueue().to(handleRequest)
.encodeDubbo()
)

Expand All @@ -95,10 +95,17 @@ export default function (routerKey, listener, routeResources) {
}

function makeRouter(listener, routeResources) {
var cache = new algo.Cache({ ttl: 3600 })
var selector = makeRuleSelector(routeResources)

return function (head, body) {
$selection = selector(body)
var key = body.slice(1,5).join(' ')
var val = cache.get(key)
if (val) {
$selection = val
} else {
cache.set(key, $selection = selector(body))
}
log?.(
`Inb #${$ctx.parent.inbound.id} Req #${$ctx.parent.messageCount+1}`, head.requestID,
`backend ${$selection?.target?.backendRef?.name}`,
Expand Down Expand Up @@ -192,10 +199,9 @@ function makeRouter(listener, routeResources) {
function makeBackendSelectorForRule(rule) {
var selector = makeBackendSelector(
'dubbo', listener, rule,

function (backendRef, backendResource, filters) {
function (backendRef, backendResource, filters, protocol) {
if (!backendResource && filters.length === 0) return response500
var forwarder = backendResource ? [makeBalancer(backendRef, backendResource)] : []
var forwarder = backendResource ? [makeBalancer(protocol, backendRef, backendResource)] : []
return pipeline($=>$
.pipe([...filters, ...forwarder], () => $ctx)
.onEnd(() => $selection.free?.())
Expand Down
12 changes: 7 additions & 5 deletions charts/fsm/components/scripts/gateways/modules/router-http.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import resources from '../resources.js'
import makeBackendSelector from './backend-selector.js'
import makeBalancer from './balancer-http.js'
import makeBalancer from './balancer.js'
import makeSessionPersistence from './session-persistence.js'
import { log, stringifyHTTPHeaders } from '../utils.js'

Expand Down Expand Up @@ -377,12 +377,14 @@ function makeRouter(listener, routeResources, gateway) {
var selector = makeBackendSelector(
'http', listener, rule,

function (backendRef, backendResource, filters) {
function (backendRef, backendResource, filters, protocol) {
if (!backendResource && filters.length === 0) return response500
var forwarder = backendResource ? [makeBalancer(backendRef, backendResource, gateway, isHTTP2)] : []
var forwarder = backendResource ? [makeBalancer(protocol, backendRef, backendResource, { gateway, isHTTP2 })] : []

if (retryPipeline) forwarder.unshift(retryPipeline)
if (timeoutPipeline) forwarder.unshift(timeoutPipeline)
if (protocol === 'http') {
if (retryPipeline) forwarder.unshift(retryPipeline)
if (timeoutPipeline) forwarder.unshift(timeoutPipeline)
}

if (sessionPersistence) {
var preserveSession = sessionPersistence.preserve
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import resources from '../resources.js'
import makeBackendSelector from './backend-selector.js'
import makeBalancer from './balancer-tcp.js'
import makeBalancer from './balancer.js'
import { log } from '../utils.js'

var shutdown = pipeline($=>$.replaceStreamStart(new StreamEnd))
Expand Down Expand Up @@ -35,8 +35,8 @@ function makeRouter(listener, routeResources, gateway) {
var selector = makeBackendSelector(
'tcp', listener,
routeResources[0]?.spec?.rules?.[0],
function (backendRef, backendResource, filters) {
var forwarder = backendResource ? makeBalancer(backendRef, backendResource, gateway) : shutdown
function (backendRef, backendResource, filters, protocol) {
var forwarder = backendResource ? makeBalancer(protocol, backendRef, backendResource, { gateway }) : shutdown
return pipeline($=>$
.pipe([...filters, forwarder], () => $ctx)
.onEnd(() => $selection.free?.())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import resources from '../resources.js'
import makeBackendSelector from './backend-selector.js'
import makeBalancer from './balancer-tcp.js'
import makeBalancer from './balancer.js'
import { log } from '../utils.js'

var shutdown = pipeline($=>$.replaceStreamStart(new StreamEnd))
Expand Down Expand Up @@ -56,8 +56,8 @@ function makeRouter(listener, routeResources, gateway) {
hostnames.forEach(name => {
var selector = makeBackendSelector(
'tcp', listener, r.spec.rules?.[0],
function (backendRef, backendResource, filters) {
var forwarder = backendResource ? makeBalancer(backendRef, backendResource, gateway) : shutdown
function (backendRef, backendResource, filters, protocol) {
var forwarder = backendResource ? makeBalancer(protocol, backendRef, backendResource, { gateway }) : shutdown
return pipeline($=>$
.pipe([...filters, forwarder], () => $ctx)
.onEnd(() => $selection.free?.())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import resources from '../resources.js'
import makeBackendSelector from './backend-selector.js'
import makeBalancer from './balancer-udp.js'
import makeBalancer from './balancer.js'
import { log } from '../utils.js'

var shutdown = pipeline($=>$.replaceStreamStart(new StreamEnd))
Expand Down Expand Up @@ -35,8 +35,8 @@ function makeRouter(listener, routeResources) {
var selector = makeBackendSelector(
'udp', listener,
routeResources[0]?.spec?.rules?.[0],
function (backendRef, backendResource, filters) {
var forwarder = backendResource ? makeBalancer(backendRef, backendResource) : shutdown
function (backendRef, backendResource, filters, protocol) {
var forwarder = backendResource ? makeBalancer(protocol, backendRef, backendResource) : shutdown
return pipeline($=>$
.pipe([...filters, forwarder], () => $ctx)
.onEnd(() => $selection.free?.())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ export default function (sessionPersistence) {
}

function makeHeaderSessionKeyGetter() {
var headerName = sessionPresistance.sessionName
var headerName = sessionPersistence.sessionName
return (head) => head.headers[headerName]
}

return { restore, preserve }
}
}
2 changes: 1 addition & 1 deletion charts/fsm/components/scripts/gateways/startup.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ function makeListener(gateway, listener) {

var $ctx

pipy.listen(port, wireProto, $=>$
pipy.listen(port, wireProto, { idleTimeout: 0 }, $=>$
.onStart(i => {
$ctx = {
inbound: i,
Expand Down
10 changes: 7 additions & 3 deletions charts/fsm/components/scripts/gateways/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,22 @@ export function findPolicies(kind, targetResource) {
}

export function makeFilters(protocol, filters) {
if (!filters) return []
return filters.map(
filters = filters || []
var pipelines = filters.map(
config => {
var maker = (
importFilter(`./config/filters/${protocol}/${config.type}.js`) ||
importFilter(`./filters/${protocol}/${config.type}.js`)
)
if (!maker) throw `${protocol} filter not found: ${config.type}`
if (typeof maker !== 'function') throw `filter ${config.type} is not a function`
return maker(config, resources)
var pl = maker(config, resources)
protocol = pl?.meta?.outputProtocol || protocol
return pl
}
)
pipelines.outputProtocol = protocol
return pipelines
}

function importFilter(pathname) {
Expand Down
Loading