diff --git a/.gitmodules b/.gitmodules index 39ce5120a7..70ad60d8ad 100644 --- a/.gitmodules +++ b/.gitmodules @@ -238,3 +238,7 @@ path = vendor/nim-quic url = https://github.com/vacp2p/nim-quic branch = main +[submodule "vendor/nim-async-channels"] + path = vendor/nim-async-channels + url = https://github.com/status-im/nim-async-channels + branch = master diff --git a/execution_chain/conf.nim b/execution_chain/conf.nim index 703bc2f6e3..0ee528c9d4 100644 --- a/execution_chain/conf.nim +++ b/execution_chain/conf.nim @@ -447,6 +447,12 @@ type defaultValue: false name: "engine-api" .}: bool + engineApiChannelEnabled* {. + hidden + desc: "Enable the Engine API Channel" + defaultValue: false + name: "debug-engine-api-channel" .}: bool + engineApiPort* {. desc: "Listening port for the Engine API(http and ws)" defaultValue: defaultEngineApiPort @@ -771,7 +777,7 @@ func getAllowedOrigins*(config: ExecutionClientConf): seq[Uri] = result.add parseUri(item) func engineApiServerEnabled*(config: ExecutionClientConf): bool = - config.engineApiEnabled or config.engineApiWsEnabled + config.engineApiEnabled or config.engineApiWsEnabled or config.engineApiChannelEnabled func shareServerWithEngineApi*(config: ExecutionClientConf): bool = config.engineApiServerEnabled and diff --git a/execution_chain/el_sync.nim b/execution_chain/el_sync.nim index a9775a2a6a..de25b841df 100644 --- a/execution_chain/el_sync.nim +++ b/execution_chain/el_sync.nim @@ -18,7 +18,8 @@ import web3/[engine_api, primitives, conversions], beacon_chain/consensus_object_pools/blockchain_dag, beacon_chain/el/[el_manager, engine_api_conversions], - beacon_chain/spec/[forks, presets, state_transition_block] + beacon_chain/spec/[forks, presets, state_transition_block], + json_rpc/client logScope: topics = "elsync" @@ -87,24 +88,15 @@ proc findSlot( Opt.some importedSlot -proc syncToEngineApi*(dag: ChainDAGRef, url: EngineApiUrl) {.async.} = +proc syncToEngineApi*(dag: ChainDAGRef, rpcClient: RpcClient) {.async.} = # Takes blocks from the CL and sends them to the EL - the attempt is made # optimistically until something unexpected happens (reorg etc) at which point # the process ends let # Create the client for the engine api - # And exchange the capabilities for a test communication - web3 = await url.newWeb3() - rpcClient = web3.provider (lastEra1Block, firstSlotAfterMerge) = dag.cfg.loadNetworkConfig() - defer: - try: - await web3.close() - except: - discard - # Load the EL state detials and create the beaconAPI client var elBlockNumber = uint64(await rpcClient.eth_blockNumber()) diff --git a/execution_chain/nimbus.nim b/execution_chain/nimbus.nim index cd2307bdb1..679a918e56 100644 --- a/execution_chain/nimbus.nim +++ b/execution_chain/nimbus.nim @@ -16,15 +16,15 @@ proc workaround*(): int {.exportc.} = return int(Future[Quantity]().internalValue) import - std/[os, net, options, strformat, terminal, typetraits], + std/[os, net, options, terminal, typetraits], stew/io2, chronos/threadsync, chronicles, metrics, metrics/chronos_httpserver, - nimcrypto/sysrand, eth/enr/enr, eth/net/nat, + json_rpc/rpcchannels, eth/p2p/discoveryv5/random2, beacon_chain/spec/[engine_authentication], beacon_chain/validators/keystore_management, @@ -36,7 +36,6 @@ import nimbus_binary_common, process_state, ], - ./rpc/jwt_auth, ./[ constants, conf as ecconf, @@ -170,13 +169,13 @@ type tcpPort: Port udpPort: Port elSync: bool + channel: RpcChannelPtrs ExecutionThreadConfig = object tsp: ThreadSignalPtr tcpPort: Port udpPort: Option[Port] - -var jwtKey: JwtSharedKey + channel: RpcChannelPtrs proc dataDir*(config: NimbusConf): string = string config.dataDirFlag.get( @@ -190,14 +189,14 @@ proc justWait(tsp: ThreadSignalPtr) {.async: (raises: [CancelledError]).} = notice "Waiting failed", err = exc.msg proc elSyncLoop( - dag: ChainDAGRef, url: EngineApiUrl + dag: ChainDAGRef, elManager: ELManager ) {.async: (raises: [CancelledError]).} = while true: await sleepAsync(12.seconds) # TODO trigger only when the EL needs syncing try: - await syncToEngineApi(dag, url) + await syncToEngineApi(dag, elManager.channel()) except CatchableError as exc: # This can happen when the EL is busy doing some work, specially on # startup @@ -208,17 +207,8 @@ proc runBeaconNode(p: BeaconThreadConfig) {.thread.} = stderr.writeLine error # Logging not yet set up quit QuitFailure - let engineUrl = EngineApiUrl.init( - &"http://127.0.0.1:{defaultEngineApiPort}/", Opt.some(@(distinctBase(jwtKey))) - ) - config.metricsEnabled = false - config.elUrls = - @[ - EngineApiUrlConfigValue( - url: engineUrl.url, jwtSecret: some toHex(distinctBase(jwtKey)) - ) - ] + config.elUrls = @[EngineApiUrlConfigValue(channel: Opt.some(p.channel))] config.statusBarEnabled = false # Multi-threading issues due to logging config.tcpPort = p.tcpPort config.udpPort = p.udpPort @@ -244,7 +234,7 @@ proc runBeaconNode(p: BeaconThreadConfig) {.thread.} = return if p.elSync: - discard elSyncLoop(node.dag, engineUrl) + discard elSyncLoop(node.dag, node.elManager) dynamicLogScope(comp = "bn"): if node.nickname != "": @@ -259,11 +249,8 @@ proc runBeaconNode(p: BeaconThreadConfig) {.thread.} = proc runExecutionClient(p: ExecutionThreadConfig) {.thread.} = var config = makeConfig(ignoreUnknown = true) config.metricsEnabled = false - config.engineApiEnabled = true - config.engineApiPort = Port(defaultEngineApiPort) - config.engineApiAddress = defaultAdminListenAddress - config.jwtSecret.reset() - config.jwtSecretValue = some toHex(distinctBase(jwtKey)) + config.engineApiEnabled = false + config.engineApiChannelEnabled = true config.agentString = "nimbus" config.tcpPort = p.tcpPort config.udpPortFlag = p.udpPort @@ -281,16 +268,14 @@ proc runExecutionClient(p: ExecutionThreadConfig) {.thread.} = let com = setupCommonRef(config) dynamicLogScope(comp = "ec"): - nimbus_execution_client.runExeClient(config, com, p.tsp.justWait()) + nimbus_execution_client.runExeClient( + config, com, p.tsp.justWait(), channel = Opt.some p.channel + ) # Stop the other thread as well, in case `runExeClient` stopped early waitFor p.tsp.fire() proc runCombinedClient() = - # Make it harder to connect to the (internal) engine - this will of course - # go away - discard randomBytes(distinctBase(jwtKey)) - const banner = "Nimbus v0.0.1" var config = NimbusConf.loadWithBanners(banner, copyright, [specBanner], true).valueOr: @@ -329,6 +314,9 @@ proc runCombinedClient() = "Baked-in KZG setup is correct" ) + var channel: RpcChannel + let pairs = channel.open().expect("working channel") + var bnThread: Thread[BeaconThreadConfig] let bnStop = ThreadSignalPtr.new().expect("working ThreadSignalPtr") createThread( @@ -339,6 +327,7 @@ proc runCombinedClient() = tcpPort: config.beaconTcpPort.get(config.tcpPort.get(Port defaultEth2TcpPort)), udpPort: config.beaconUdpPort.get(config.udpPort.get(Port defaultEth2TcpPort)), elSync: config.elSync, + channel: pairs, ), ) @@ -361,6 +350,7 @@ proc runCombinedClient() = some(Port(uint16(config.udpPort.get()) + 1)) else: none(Port), + channel: pairs, ), ) diff --git a/execution_chain/nimbus_desc.nim b/execution_chain/nimbus_desc.nim index eb2c294e4e..85c1577ea1 100644 --- a/execution_chain/nimbus_desc.nim +++ b/execution_chain/nimbus_desc.nim @@ -22,7 +22,8 @@ import ./sync/snap as snap_sync, ./sync/wire_protocol, ./beacon/beacon_engine, - ./common + ./common, + json_rpc/rpcchannels when enabledLogLevel == TRACE: import std/sequtils @@ -44,6 +45,7 @@ type NimbusNode* = ref object httpServer*: NimbusHttpServerRef engineApiServer*: NimbusHttpServerRef + engineApiChannel*: RpcChannelServer ethNode*: EthereumNode fc*: ForkedChainRef txPool*: TxPoolRef diff --git a/execution_chain/nimbus_execution_client.nim b/execution_chain/nimbus_execution_client.nim index a4d88093af..5b5570a037 100644 --- a/execution_chain/nimbus_execution_client.nim +++ b/execution_chain/nimbus_execution_client.nim @@ -208,14 +208,14 @@ proc setupP2P(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef) = nimbus.beaconSyncRef = BeaconSyncRef(nil) nimbus.snapSyncRef = SnapSyncRef(nil) -proc init*(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef) = +proc init*(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef, channel: Opt[RpcChannelPtrs]) = nimbus.accountsManager = new AccountsManager nimbus.rng = newRng() basicServices(nimbus, config, com) manageAccounts(nimbus, config) setupP2P(nimbus, config, com) - setupRpc(nimbus, config, com) + setupRpc(nimbus, config, com, channel) # Not starting any syncer if there is definitely no way to run it. This # avoids polling (i.e. waiting for instructions) and some logging. @@ -232,9 +232,9 @@ proc init*(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef) = nimbus.beaconSyncRef = BeaconSyncRef(nil) nimbus.snapSyncRef = SnapSyncRef(nil) -proc init*(T: type NimbusNode, config: ExecutionClientConf, com: CommonRef): T = +proc init*(T: type NimbusNode, config: ExecutionClientConf, com: CommonRef, channel: Opt[RpcChannelPtrs]): T = let nimbus = T() - nimbus.init(config, com) + nimbus.init(config, com, channel) nimbus proc preventLoadingDataDirForTheWrongNetwork(db: CoreDbRef; config: ExecutionClientConf) = @@ -306,6 +306,7 @@ proc runExeClient*( com: CommonRef, stopper: StopFuture, nimbus = NimbusNode(nil), + channel = Opt.none(RpcChannelPtrs), ) = ## Launches and runs the execution client for pre-configured `nimbus` and ## `conf` argument descriptors. @@ -313,9 +314,9 @@ proc runExeClient*( var nimbus = nimbus if nimbus.isNil: - nimbus = NimbusNode.init(config, com) + nimbus = NimbusNode.init(config, com, channel) else: - nimbus.init(config, com) + nimbus.init(config, com, channel) defer: let diff --git a/execution_chain/rpc.nim b/execution_chain/rpc.nim index dbc859a417..69a7f32913 100644 --- a/execution_chain/rpc.nim +++ b/execution_chain/rpc.nim @@ -12,7 +12,7 @@ import chronicles, websock/websock, - json_rpc/rpcserver, + json_rpc/[rpcserver, rpcchannels], ./rpc/[common, cors, debug, engine_api, jwt_auth, rpc_server, server_api], ./[conf, nimbus_desc] @@ -23,7 +23,8 @@ export jwt_auth, cors, rpc_server, - server_api + server_api, + rpcchannels const DefaultChunkSize = 1024*1024 @@ -53,7 +54,6 @@ func installRPC(server: RpcServer, if RpcFlag.Debug in flags: setupDebugRpc(com, nimbus.txPool, server) - proc newRpcWebsocketHandler(): RpcWebSocketHandler = let rng = HmacDrbgContext.new() RpcWebSocketHandler( @@ -198,8 +198,8 @@ proc addServices(handlers: var seq[RpcHandlerProc], handlers.addHandler(server) proc setupRpc*(nimbus: NimbusNode, config: ExecutionClientConf, - com: CommonRef) = - if not config.engineApiEnabled: + com: CommonRef, channel: Opt[RpcChannelPtrs]) = + if not config.engineApiEnabled and channel.isNone(): warn "Engine API disabled, the node will not respond to consensus client updates (enable with `--engine-api`)" if not config.serverEnabled: @@ -257,3 +257,10 @@ proc setupRpc*(nimbus: NimbusNode, config: ExecutionClientConf, quit(QuitFailure) nimbus.engineApiServer = res.get nimbus.engineApiServer.start() + + if channel.isSome(): + nimbus.engineApiChannel = RpcChannelServer.new(channel[]) + + setupEngineAPI(nimbus.beaconEngine, nimbus.engineApiChannel) + installRPC(nimbus.engineApiChannel, nimbus, config, com, serverApi, {RpcFlag.Eth}) + nimbus.engineApiChannel.start() diff --git a/vendor/nim-async-channels b/vendor/nim-async-channels new file mode 160000 index 0000000000..e63ac36343 --- /dev/null +++ b/vendor/nim-async-channels @@ -0,0 +1 @@ +Subproject commit e63ac3634373facf14984433a7a366503ea1d0f7 diff --git a/vendor/nim-json-rpc b/vendor/nim-json-rpc index 0d25b6d6d2..f0302c9b11 160000 --- a/vendor/nim-json-rpc +++ b/vendor/nim-json-rpc @@ -1 +1 @@ -Subproject commit 0d25b6d6d2c25bcfbe6f827a5fc79730cef6fbf4 +Subproject commit f0302c9b11cc15032ce2b6e2a15e5a5da800a4cc diff --git a/vendor/nimbus-eth2 b/vendor/nimbus-eth2 index 688b3eee11..affaea5b5e 160000 --- a/vendor/nimbus-eth2 +++ b/vendor/nimbus-eth2 @@ -1 +1 @@ -Subproject commit 688b3eee111ac809bb73dffb8e1fe612d53c302a +Subproject commit affaea5b5ed69f5ca20754f58bdf8d8d7dc73255