Skip to content

Commit ff57764

Browse files
authored
feat(engine): add RouterFactorySpi for pluggable stream factory composition (#1757)
1 parent 5fd5f45 commit ff57764

26 files changed

Lines changed: 937 additions & 94 deletions

runtime/engine/AGENTS.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ default implementation of the new method or the build will fail.
4646
## Test implementations for engine concepts
4747

4848
Every engine concept (binding, guard, vault, catalog, exporter, metric group,
49-
model, resolver) has a minimal **test implementation** that lives in this
49+
model, resolver, router) has a minimal **test implementation** that lives in this
5050
module's test sources under
5151
`src/test/java/.../engine/test/internal/<concept>/`. For example:
5252

@@ -60,6 +60,7 @@ module's test sources under
6060
| metric group | `TestMetricGroupFactorySpi`, `TestMetricGroup` |
6161
| model | `TestModelFactorySpi`, `TestModel`, `TestModelContext` |
6262
| resolver | `TestResolverFactorySpi`, `TestResolverSpi` |
63+
| router | `TestRouterFactorySpi`, `TestRouter`, `TestRouterContext` |
6364

6465
The engine module is built with Maven's `test-jar` packaging so these classes
6566
are published as `engine:<version>:test-jar`. Every `specs/*.spec` module

runtime/engine/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@
288288
<include>io/aklivity/zilla/runtime/engine/test/internal/metrics/**/*.class</include>
289289
<include>io/aklivity/zilla/runtime/engine/test/internal/model/**/*.class</include>
290290
<include>io/aklivity/zilla/runtime/engine/test/internal/resolver/**/*.class</include>
291+
<include>io/aklivity/zilla/runtime/engine/test/internal/router/**/*.class</include>
291292
<include>io/aklivity/zilla/runtime/engine/test/internal/store/**/*.class</include>
292293
<include>io/aklivity/zilla/runtime/engine/test/internal/vault/**/*.class</include>
293294
<include>io/aklivity/zilla/runtime/engine/internal/concurrent/bench/**/*.class</include>

runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import io.aklivity.zilla.runtime.engine.catalog.Catalog;
5959
import io.aklivity.zilla.runtime.engine.config.KindConfig;
6060
import io.aklivity.zilla.runtime.engine.config.NamespaceConfig;
61+
import io.aklivity.zilla.runtime.engine.config.RouterConfig;
6162
import io.aklivity.zilla.runtime.engine.diagnostic.EngineDiagnosticsTask;
6263
import io.aklivity.zilla.runtime.engine.event.EventFormatterFactory;
6364
import io.aklivity.zilla.runtime.engine.exporter.Exporter;
@@ -77,6 +78,8 @@
7778
import io.aklivity.zilla.runtime.engine.metrics.MetricGroup;
7879
import io.aklivity.zilla.runtime.engine.model.Model;
7980
import io.aklivity.zilla.runtime.engine.namespace.NamespacedId;
81+
import io.aklivity.zilla.runtime.engine.router.Router;
82+
import io.aklivity.zilla.runtime.engine.router.RouterFactory;
8083
import io.aklivity.zilla.runtime.engine.store.Store;
8184
import io.aklivity.zilla.runtime.engine.vault.Vault;
8285

@@ -99,6 +102,7 @@ public final class Engine implements Collector, AutoCloseable
99102
private final EngineConfiguration config;
100103
private final EngineManager manager;
101104
private final EngineDiagnosticsTask diagnostics;
105+
private final RouterConfig routerConfig;
102106

103107
private final EventWriter eventWriter;
104108
private final AtomicBoolean closed;
@@ -190,13 +194,20 @@ public final class Engine implements Collector, AutoCloseable
190194
}
191195
}
192196

197+
final Router router = RouterFactory.instantiate().create(config.router(), config);
198+
final RouterConfig routerConfig = RouterConfig.builder()
199+
.id(0L)
200+
.name(router.name())
201+
.build();
202+
this.routerConfig = routerConfig;
203+
193204
List<EngineWorker> workers = new ArrayList<>(workerCount);
194205
for (int workerIndex = 0; workerIndex < workerCount; workerIndex++)
195206
{
196207
EngineWorker worker =
197208
new EngineWorker(config, tasks, labels, diagnoseOnError, tuning::affinity, bindings, exporters,
198-
guards, vaults, catalogs, models, metricGroups, stores, this, this::supplyEventReader,
199-
eventFormatterFactory, workerIndex, readonly, this::process, boss);
209+
guards, vaults, catalogs, models, metricGroups, stores, router, routerConfig, this,
210+
this::supplyEventReader, eventFormatterFactory, workerIndex, readonly, this::process, boss);
200211
workers.add(worker);
201212
}
202213
this.workers = workers;
@@ -391,6 +402,11 @@ public Clock clock()
391402
return config.clock();
392403
}
393404

405+
public RouterConfig routerConfig()
406+
{
407+
return routerConfig;
408+
}
409+
394410
public static EngineBuilder builder()
395411
{
396412
return new EngineBuilder();

runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public class EngineConfiguration extends Configuration
107107
public static final PropertyDef<ErrorReporter> ENGINE_ERROR_REPORTER;
108108
public static final PropertyDef<RevocationStrategy> ENGINE_CERTIFICATE_REVOCATION_STRATEGY;
109109
public static final PropertyDef<Path> ENGINE_DIAGNOSTICS_DIRECTORY;
110+
public static final PropertyDef<String> ENGINE_ROUTER;
110111

111112
private static final ConfigurationDef ENGINE_CONFIG;
112113

@@ -175,6 +176,7 @@ public class EngineConfiguration extends Configuration
175176
EngineConfiguration::decodeRevocationStrategy, RevocationStrategy.NONE);
176177
ENGINE_DIAGNOSTICS_DIRECTORY = config.property(Path.class, "diagnostics.directory",
177178
EngineConfiguration::decodeDiagnosticsDirectory, (String) null);
179+
ENGINE_ROUTER = config.property("router", "engine");
178180
ENGINE_CONFIG = config;
179181
}
180182

@@ -425,6 +427,11 @@ public Function<String, InetAddress[]> hostResolver()
425427
return ENGINE_HOST_RESOLVER.get(this)::resolve;
426428
}
427429

430+
public String router()
431+
{
432+
return ENGINE_ROUTER.get(this);
433+
}
434+
428435
private static int defaultTaskParallelism(
429436
Configuration config)
430437
{
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.engine.config;
17+
18+
import static java.util.Objects.requireNonNull;
19+
import static java.util.function.Function.identity;
20+
21+
public class RouterConfig
22+
{
23+
public final long id;
24+
public final String name;
25+
public final OptionsConfig options;
26+
27+
public static RouterConfigBuilder<RouterConfig> builder()
28+
{
29+
return new RouterConfigBuilder<>(identity());
30+
}
31+
32+
protected RouterConfig(
33+
long id,
34+
String name,
35+
OptionsConfig options)
36+
{
37+
this.id = id;
38+
this.name = requireNonNull(name);
39+
this.options = options;
40+
}
41+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.engine.config;
17+
18+
import java.util.function.Function;
19+
20+
public final class RouterConfigBuilder<T> extends ConfigBuilder<T, RouterConfigBuilder<T>>
21+
{
22+
private final Function<RouterConfig, T> mapper;
23+
24+
private long id;
25+
private String name;
26+
private OptionsConfig options;
27+
28+
RouterConfigBuilder(
29+
Function<RouterConfig, T> mapper)
30+
{
31+
this.mapper = mapper;
32+
}
33+
34+
@Override
35+
@SuppressWarnings("unchecked")
36+
protected Class<RouterConfigBuilder<T>> thisType()
37+
{
38+
return (Class<RouterConfigBuilder<T>>) getClass();
39+
}
40+
41+
public RouterConfigBuilder<T> id(
42+
long id)
43+
{
44+
this.id = id;
45+
return this;
46+
}
47+
48+
public RouterConfigBuilder<T> name(
49+
String name)
50+
{
51+
this.name = name;
52+
return this;
53+
}
54+
55+
public <C extends ConfigBuilder<RouterConfigBuilder<T>, C>> C options(
56+
Function<Function<OptionsConfig, RouterConfigBuilder<T>>, C> options)
57+
{
58+
return options.apply(this::options);
59+
}
60+
61+
public RouterConfigBuilder<T> options(
62+
OptionsConfig options)
63+
{
64+
this.options = options;
65+
return this;
66+
}
67+
68+
@Override
69+
public T build()
70+
{
71+
return mapper.apply(new RouterConfig(id, name, options));
72+
}
73+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.engine.internal.registry;
17+
18+
import java.util.function.Consumer;
19+
20+
import io.aklivity.zilla.runtime.engine.Configuration;
21+
import io.aklivity.zilla.runtime.engine.binding.BindingHandler;
22+
import io.aklivity.zilla.runtime.engine.config.NamespaceConfig;
23+
import io.aklivity.zilla.runtime.engine.router.RouteableContext;
24+
25+
final class EngineRouteable implements RouteableContext
26+
{
27+
private final Configuration config;
28+
private final BindingHandler streamFactory;
29+
private final Consumer<NamespaceConfig> attachComposite;
30+
private final Consumer<NamespaceConfig> detachComposite;
31+
32+
EngineRouteable(
33+
Configuration config,
34+
BindingHandler streamFactory,
35+
Consumer<NamespaceConfig> attachComposite,
36+
Consumer<NamespaceConfig> detachComposite)
37+
{
38+
this.config = config;
39+
this.streamFactory = streamFactory;
40+
this.attachComposite = attachComposite;
41+
this.detachComposite = detachComposite;
42+
}
43+
44+
@Override
45+
public Configuration config()
46+
{
47+
return config;
48+
}
49+
50+
@Override
51+
public BindingHandler streamFactory()
52+
{
53+
return streamFactory;
54+
}
55+
56+
@Override
57+
public void attachComposite(
58+
NamespaceConfig composite)
59+
{
60+
attachComposite.accept(composite);
61+
}
62+
63+
@Override
64+
public void detachComposite(
65+
NamespaceConfig composite)
66+
{
67+
detachComposite.accept(composite);
68+
}
69+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.engine.internal.registry;
17+
18+
import io.aklivity.zilla.runtime.engine.router.RouteableContext;
19+
import io.aklivity.zilla.runtime.engine.router.Router;
20+
import io.aklivity.zilla.runtime.engine.router.RouterContext;
21+
22+
public final class EngineRouter implements Router
23+
{
24+
public static final String NAME = "engine";
25+
26+
@Override
27+
public String name()
28+
{
29+
return NAME;
30+
}
31+
32+
@Override
33+
public RouterContext supply(
34+
RouteableContext context)
35+
{
36+
return new EngineRouterContext(context.streamFactory());
37+
}
38+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.engine.internal.registry;
17+
18+
import io.aklivity.zilla.runtime.engine.binding.BindingHandler;
19+
import io.aklivity.zilla.runtime.engine.config.RouterConfig;
20+
import io.aklivity.zilla.runtime.engine.router.RouterContext;
21+
22+
final class EngineRouterContext implements RouterContext
23+
{
24+
private final BindingHandler streamFactory;
25+
26+
EngineRouterContext(
27+
BindingHandler streamFactory)
28+
{
29+
this.streamFactory = streamFactory;
30+
}
31+
32+
@Override
33+
public BindingHandler attach(
34+
RouterConfig config)
35+
{
36+
return streamFactory;
37+
}
38+
39+
@Override
40+
public void detach(
41+
long routerId)
42+
{
43+
}
44+
}

0 commit comments

Comments
 (0)