diff --git a/packages/ndk/lib/data_layer/data_sources/websocket_channel.dart b/packages/ndk/lib/data_layer/data_sources/websocket_channel.dart new file mode 100644 index 000000000..f2db949bf --- /dev/null +++ b/packages/ndk/lib/data_layer/data_sources/websocket_channel.dart @@ -0,0 +1,48 @@ +import 'dart:async'; + +import 'package:web_socket_channel/web_socket_channel.dart'; + +class WebsocketChannelDS { + final WebSocketChannel ws; + final String url; + + final Completer _readyCompleter = Completer(); + + WebsocketChannelDS(this.ws, this.url) { + ws.ready.then((_) { + if (!_readyCompleter.isCompleted) { + _readyCompleter.complete(); + } + }); + } + + StreamSubscription listen( + void Function(dynamic) onData, { + Function? onError, + void Function()? onDone, + }) { + return ws.stream.listen(onData, onDone: onDone, onError: onError); + } + + void send(dynamic data) { + return ws.sink.add(data); + } + + Future close() { + return ws.sink.close(); + } + + bool isOpen() { + final rdy = _readyCompleter.isCompleted; + final notClosed = ws.closeCode == null; + return rdy && notClosed; + } + + int? closeCode() { + return ws.closeCode; + } + + String? closeReason() { + return ws.closeReason; + } +} diff --git a/packages/ndk/lib/data_layer/data_sources/websocket_client.dart b/packages/ndk/lib/data_layer/data_sources/websocket_client.dart index d8648994b..c82a282b5 100644 --- a/packages/ndk/lib/data_layer/data_sources/websocket_client.dart +++ b/packages/ndk/lib/data_layer/data_sources/websocket_client.dart @@ -26,20 +26,18 @@ class WebsocketDSClient { } bool isOpen() { - return ws.connection.state == Connected() || - ws.connection.state == Reconnected(); + final state = ws.connection.state; + return state is Connected || state is Reconnected; } int? closeCode() { - return ws.connection.state == Disconnected() - ? (ws.connection.state as Disconnected).code - : null; + final state = ws.connection.state; + return state is Disconnected ? state.code : null; } String? closeReason() { - return ws.connection.state == Disconnected() - ? (ws.connection.state as Disconnected).reason - : null; + final state = ws.connection.state; + return state is Disconnected ? state.reason : null; } } // coverage:ignore-end diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart index cbb66dd44..cee7a44de 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart @@ -4,6 +4,7 @@ import 'package:web_socket_client/web_socket_client.dart'; import '../../../domain_layer/repositories/nostr_transport.dart'; import '../../../shared/logger/logger.dart'; +import '../../data_sources/websocket_channel.dart'; import '../../data_sources/websocket_client.dart'; /// A WebSocket-based implementation of the NostrTransport interface. @@ -22,28 +23,24 @@ class WebSocketClientNostrTransport implements NostrTransport { WebSocketClientNostrTransport(this._websocketDS, [Function? onReconnect]) { Completer completer = Completer(); ready = completer.future; + _stateStreamSubscription = _websocketDS.ws.connection.listen((state) { Logger.log.t("${_websocketDS.url} connection state changed to $state"); - switch (state) { - case Connected() || Reconnected(): + if (state is Connected || state is Reconnected) { + if (!completer.isCompleted) { completer.complete(); - if (state == Reconnected() && onReconnect != null) { - onReconnect.call(); - } - break; - case Disconnected(): - completer = Completer(); - ready = completer.future; - break; - case Connecting(): - // Do nothing, just waiting for connection to be established - break; - case Reconnecting(): - // Do nothing, just waiting for reconnection to be established - break; - default: - Logger.log.w( - "${_websocketDS.url} connection state changed to unknown state: $state"); + } + if (state is Reconnected && onReconnect != null) { + onReconnect.call(); + } + } else if (state is Disconnected) { + completer = Completer(); + ready = completer.future; + } else if (state is Connecting || state is Reconnecting) { + // Do nothing, just waiting for (re)connection to be established + } else { + Logger.log.w( + "${_websocketDS.url} connection state changed to unknown state: $state"); } }); } diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart index 207f59cd4..9ee4cfdc4 100644 --- a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_client_nostr_transport_factory.dart @@ -1,11 +1,18 @@ import 'package:ndk/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart'; +import 'package:ndk/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_client/web_socket_client.dart'; import '../../../domain_layer/repositories/nostr_transport.dart'; import '../../../shared/helpers/relay_helper.dart'; +import '../../data_sources/websocket_channel.dart'; import '../../data_sources/websocket_client.dart'; class WebSocketClientNostrTransportFactory implements NostrTransportFactory { + final bool useIsolate; + + WebSocketClientNostrTransportFactory({this.useIsolate = true}); + @override NostrTransport call(String url, Function? onReconnect) { final myUrl = cleanRelayUrl(url); @@ -14,6 +21,12 @@ class WebSocketClientNostrTransportFactory implements NostrTransportFactory { throw Exception("relayUrl is not parsable"); } + // Use isolate-based transport for better performance + if (useIsolate) { + return WebSocketIsolateNostrTransport(myUrl, onReconnect); + } + + // Fallback to regular transport final backoff = BinaryExponentialBackoff( initial: Duration(seconds: 1), maximumStep: 10); final client = WebSocket(Uri.parse(myUrl), backoff: backoff); diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart new file mode 100644 index 000000000..47486bc4f --- /dev/null +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_entities.dart @@ -0,0 +1,104 @@ +part of 'websocket_isolate_nostr_transport.dart'; + +/// Message types for isolate communication +enum _IsolateMessageType { + ready, + reconnecting, + message, + error, + done, +} + +/// Internal message class for communication between main isolate and worker isolate +class _IsolateMessage { + /// connection id is the cleaned relay url, (needed so reconnect, restore state works) + final String connectionId; + final _IsolateMessageType type; + final NostrMessageRaw? data; + final String? error; + final int? closeCode; + final String? closeReason; + + _IsolateMessage({ + required this.connectionId, + required this.type, + this.data, + this.error, + this.closeCode, + this.closeReason, + }); +} + +/// Base class for commands sent from main isolate to worker isolate +abstract class _IsolateCommand { + final String connectionId; + + _IsolateCommand({required this.connectionId}); +} + +class _ConnectCommand extends _IsolateCommand { + final String url; + + _ConnectCommand({required super.connectionId, required this.url}); +} + +class _SendCommand extends _IsolateCommand { + final dynamic data; + + _SendCommand({required super.connectionId, required this.data}); +} + +class _CloseCommand extends _IsolateCommand { + _CloseCommand({required super.connectionId}); +} + +enum NostrMessageRawType { + notice, + event, + eose, + ok, + closed, + auth, + unknown, +} + +//? needed until Nip01Event is refactored to be immutable +class Nip01EventRaw { + final String id; + + final String pubKey; + + final int createdAt; + + final int kind; + + final List> tags; + + final String content; + + final String sig; + + Nip01EventRaw({ + required this.id, + required this.pubKey, + required this.createdAt, + required this.kind, + required this.tags, + required this.content, + required this.sig, + }); +} + +class NostrMessageRaw { + final NostrMessageRawType type; + final Nip01EventRaw? nip01Event; + final String? requestId; + final dynamic otherData; + + NostrMessageRaw({ + required this.type, + this.nip01Event, + this.requestId, + this.otherData, + }); +} diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart new file mode 100644 index 000000000..8c6f7f12c --- /dev/null +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart @@ -0,0 +1,144 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:isolate'; + +import 'package:web_socket_client/web_socket_client.dart'; + +import '../../../../domain_layer/repositories/nostr_transport.dart'; +import '../../../../shared/helpers/relay_helper.dart'; +import '../../../../shared/logger/logger.dart'; + +part 'websocket_isolate_entities.dart'; +part 'websocket_isolate_nostr_transport_worker.dart'; +part 'websocket_isolate_nostr_transport_manager.dart'; + +class WebSocketIsolateNostrTransport implements NostrTransport { + final String url; + final Function? onReconnect; + final Completer _readyCompleter = Completer(); + final StreamController _messageController = + StreamController.broadcast(); + + late final String _connectionId; + final _WebSocketIsolateManager _manager = _WebSocketIsolateManager.instance; + + int? _closeCode; + String? _closeReason; + bool _isOpen = false; + bool _isInitialized = false; + + WebSocketIsolateNostrTransport(this.url, this.onReconnect) { + _initialize(); + } + + Future _initialize() async { + if (_isInitialized) return; + _isInitialized = true; + + try { + await _manager.ready; + + _connectionId = _manager._registerConnection( + controller: _messageController, + connectionId: url, + onStateChange: (state) { + // Handle state changes from isolate + switch (state) { + case _IsolateMessageType.ready: + _isOpen = true; + if (!_readyCompleter.isCompleted) { + _readyCompleter.complete(); + } + break; + case _IsolateMessageType.reconnecting: + Logger.log.i("WebSocket reconnecting: $url"); + if (onReconnect != null) { + onReconnect!(); + } + break; + case _IsolateMessageType.done: + _isOpen = false; + break; + case _IsolateMessageType.message: + case _IsolateMessageType.error: + break; + } + }, + ); + + _manager.sendCommand( + _ConnectCommand( + connectionId: _connectionId, + url: url, + ), + ); + } catch (e) { + Logger.log.e("Failed to initialize WebSocket for $url: $e"); + if (!_readyCompleter.isCompleted) { + _readyCompleter.completeError(e); + } + } + } + + @override + Future get ready => _readyCompleter.future; + + @override + bool isOpen() => _isOpen; + + @override + int? closeCode() => _closeCode; + + @override + String? closeReason() => _closeReason; + + @override + void send(dynamic data) { + if (_isOpen) { + _manager.sendCommand( + _SendCommand( + connectionId: _connectionId, + data: data, + ), + ); + } else { + Logger.log.w("Attempted to send on closed/unready WebSocket: $url"); + } + } + + @override + Future close() async { + _manager.sendCommand( + _CloseCommand( + connectionId: _connectionId, + ), + ); + + await Future.delayed(Duration(milliseconds: 100)); + + _manager._unregisterConnection(_connectionId); + + if (!_messageController.isClosed) { + await _messageController.close(); + } + } + + @override + StreamSubscription listen( + void Function(NostrMessageRaw) onData, { + Function? onError, + void Function()? onDone, + }) { + return _messageController.stream + .listen(onData, onError: onError, onDone: onDone); + } + + @override + set ready(Future value) { + // No-op: ready is managed internally. + } +} + +void _isolateEntry(SendPort mainSendPort) { + _WebSocketIsolateWorker(mainSendPort); +} diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart new file mode 100644 index 000000000..f37979dca --- /dev/null +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_manager.dart @@ -0,0 +1,136 @@ +part of 'websocket_isolate_nostr_transport.dart'; + +/// Singleton manager for the shared WebSocket isolate +class _WebSocketIsolateManager { + static _WebSocketIsolateManager? _instance; + static _WebSocketIsolateManager get instance { + _instance ??= _WebSocketIsolateManager._(); + return _instance!; + } + + Isolate? _isolate; + SendPort? _isolateSendPort; + final ReceivePort _receivePort = ReceivePort(); + StreamSubscription? _receivePortSubscription; + final Completer _readyCompleter = Completer(); + final Map> _connectionControllers = + {}; + final Map _stateCallbacks = {}; + + _WebSocketIsolateManager._() { + _initialize(); + } + + Future _initialize() async { + try { + _isolate = await Isolate.spawn( + _isolateEntry, + _receivePort.sendPort, + debugName: "WebSocketIsolateNostrTransportWorker", + ); + + _receivePortSubscription = _receivePort.listen((message) { + _handleIsolateMessage(message); + }); + } catch (e) { + Logger.log.e("Failed to spawn shared WebSocket isolate: $e"); + if (!_readyCompleter.isCompleted) { + _readyCompleter.completeError(e); + } + } + } + + void _handleIsolateMessage(dynamic message) { + // Handle batched messages + if (message is List<_IsolateMessage>) { + for (final msg in message) { + _processIsolateMessage(msg); + } + return; + } + + if (message is _IsolateMessage) { + _processIsolateMessage(message); + return; + } + + if (message is SendPort) { + _isolateSendPort = message; + if (!_readyCompleter.isCompleted) { + _readyCompleter.complete(); + } + return; + } + } + + void _processIsolateMessage(_IsolateMessage isolateMsg) { + final controller = _connectionControllers[isolateMsg.connectionId]; + if (controller == null) return; + + switch (isolateMsg.type) { + case _IsolateMessageType.message: + if (isolateMsg.data != null) { + controller.add(isolateMsg.data!); + } + break; + case _IsolateMessageType.error: + if (isolateMsg.error != null) { + controller.addError(isolateMsg.error!); + } + break; + case _IsolateMessageType.done: + if (!controller.isClosed) { + controller.close(); + } + break; + case _IsolateMessageType.ready: + case _IsolateMessageType.reconnecting: + // Notify state change via callback + final stateCallback = _stateCallbacks[isolateMsg.connectionId]; + if (stateCallback != null) { + stateCallback(isolateMsg.type); + } + break; + } + } + + String _registerConnection({ + required StreamController controller, + required void Function(_IsolateMessageType) onStateChange, + required String connectionId, + }) { + final id = connectionId; + _connectionControllers[id] = controller; + _stateCallbacks[id] = onStateChange; + return id; + } + + void _unregisterConnection(String connectionId) { + _connectionControllers.remove(connectionId); + _stateCallbacks.remove(connectionId); + } + + Future get ready => _readyCompleter.future; + + void sendCommand(_IsolateCommand command) { + if (_isolateSendPort != null) { + _isolateSendPort!.send(command); + } + } + + Future dispose() async { + for (final controller in _connectionControllers.values) { + if (!controller.isClosed) { + await controller.close(); + } + } + _connectionControllers.clear(); + _stateCallbacks.clear(); + + await _receivePortSubscription?.cancel(); + _receivePort.close(); + _isolate?.kill(priority: Isolate.immediate); + _isolate = null; + _instance = null; + } +} diff --git a/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart new file mode 100644 index 000000000..46866d63d --- /dev/null +++ b/packages/ndk/lib/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport_worker.dart @@ -0,0 +1,235 @@ +part of 'websocket_isolate_nostr_transport.dart'; + +class _WebSocketIsolateWorker { + final SendPort _mainSendPort; + final ReceivePort _receivePort = ReceivePort(); + final Map _connections = {}; + final Map> _subscriptions = {}; + + // Message batching + final List<_IsolateMessage> _messageQueue = []; + Timer? _batchTimer; + static const Duration _batchInterval = Duration(milliseconds: 10); + static const int _maxBatchSize = 100; + + _WebSocketIsolateWorker(this._mainSendPort) { + _mainSendPort.send(_receivePort.sendPort); + _receivePort.listen(_handleCommand); + _startBatchTimer(); + } + + void _startBatchTimer() { + _batchTimer = Timer.periodic(_batchInterval, (_) { + _flushMessageQueue(); + }); + } + + void _queueMessage(_IsolateMessage message) { + _messageQueue.add(message); + + // Flush if batch is full + if (_messageQueue.length >= _maxBatchSize) { + _flushMessageQueue(); + } + } + + void _flushMessageQueue() { + if (_messageQueue.isEmpty) return; + + // Send all messages in one batch + final batch = List<_IsolateMessage>.from(_messageQueue); + _messageQueue.clear(); + _mainSendPort.send(batch); + } + + void _handleCommand(dynamic message) { + if (message is _ConnectCommand) { + _connect(message.connectionId, message.url); + } else if (message is _SendCommand) { + _connections[message.connectionId]?.send(message.data); + } else if (message is _CloseCommand) { + _closeConnection(message.connectionId); + } + } + + Future _closeConnection(String connectionId) async { + _connections[connectionId]?.close(); + _connections.remove(connectionId); + + // Cancel all subscriptions for this connection + final subs = _subscriptions.remove(connectionId); + if (subs != null) { + for (final sub in subs) { + await sub.cancel(); + } + } + + // Clean up batch timer if no connections remain + if (_connections.isEmpty) { + _batchTimer?.cancel(); + _flushMessageQueue(); // Flush remaining messages + } + } + + void _connect(String connectionId, String url) async { + if (_connections.containsKey(connectionId)) { + // Already connected + print("connection with id $connectionId already exists"); + throw Exception("Connection with id $connectionId already exists"); + return; + } + final backoff = BinaryExponentialBackoff( + initial: Duration(seconds: 1), + maximumStep: 10, + ); + + final webSocket = WebSocket(Uri.parse(url), backoff: backoff); + _connections[connectionId] = webSocket; + + final subscriptions = []; + + final connectionSub = webSocket.connection.listen( + (state) { + if (state is Connected) { + _queueMessage( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.ready, + ), + ); + } else if (state is Reconnecting) { + _queueMessage( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.reconnecting, + ), + ); + } else if (state is Disconnected) { + _queueMessage( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.done, + closeCode: null, + closeReason: 'Disconnected', + ), + ); + } + }, + onError: (error) { + _queueMessage( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.error, + error: error.toString(), + ), + ); + }, + ); + subscriptions.add(connectionSub); + + final messagesSub = webSocket.messages.listen( + (message) { + //? this is an expensive operation + final eventJson = json.decode(message); + final NostrMessageRaw data; + + switch (eventJson[0]) { + case 'NOTICE': + data = NostrMessageRaw( + type: NostrMessageRawType.notice, + otherData: eventJson, + ); + break; + case 'EVENT': + Nip01EventRaw? nip01Event; + try { + final eventData = eventJson[2]; + nip01Event = Nip01EventRaw( + id: eventData['id'], + pubKey: eventData['pubkey'], + createdAt: eventData['created_at'], + kind: eventData['kind'], + tags: List>.from( + (eventData['tags'] as List).map( + (tag) => List.from(tag), + ), + ), + content: eventData['content'], + sig: eventData['sig'], + ); + } catch (e) { + nip01Event = null; + } + + data = NostrMessageRaw( + type: NostrMessageRawType.event, + requestId: eventJson[1], + nip01Event: nip01Event, + otherData: nip01Event == null ? eventJson : null, + ); + + break; + case 'EOSE': + data = NostrMessageRaw( + type: NostrMessageRawType.eose, otherData: eventJson); + break; + case 'OK': + data = NostrMessageRaw( + type: NostrMessageRawType.ok, + otherData: eventJson, + ); + break; + case 'CLOSED': + data = NostrMessageRaw( + type: NostrMessageRawType.closed, + otherData: eventJson, + ); + break; + case 'AUTH': + data = NostrMessageRaw( + type: NostrMessageRawType.auth, + otherData: eventJson, + ); + break; + default: + data = NostrMessageRaw( + type: NostrMessageRawType.unknown, + otherData: eventJson, + ); + break; + } + + _queueMessage( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.message, + data: data, + ), + ); + }, + onError: (error) { + _queueMessage( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.error, + error: error.toString(), + ), + ); + }, + onDone: () { + _queueMessage( + _IsolateMessage( + connectionId: connectionId, + type: _IsolateMessageType.done, + closeCode: null, + closeReason: "Done", + ), + ); + _closeConnection(connectionId); + }, + ); + subscriptions.add(messagesSub); + + _subscriptions[connectionId] = subscriptions; + } +} diff --git a/packages/ndk/lib/domain_layer/entities/nip_01_event.dart b/packages/ndk/lib/domain_layer/entities/nip_01_event.dart index 7cab47aeb..dd6814689 100644 --- a/packages/ndk/lib/domain_layer/entities/nip_01_event.dart +++ b/packages/ndk/lib/domain_layer/entities/nip_01_event.dart @@ -135,6 +135,7 @@ class Nip01Event { static String _calculateId(String publicKey, int createdAt, int kind, List tags, String content) { + print("Calculating id with: $publicKey, $createdAt, $kind, $tags"); final jsonData = json.encode([0, publicKey, createdAt, kind, tags, content]); final bytes = utf8.encode(jsonData); diff --git a/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_request_strategies/relay_jit_pubkey_strategy.dart b/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_request_strategies/relay_jit_pubkey_strategy.dart index e3e55bc22..aad24caf5 100644 --- a/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_request_strategies/relay_jit_pubkey_strategy.dart +++ b/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_request_strategies/relay_jit_pubkey_strategy.dart @@ -34,7 +34,7 @@ import '../../user_relay_lists/user_relay_lists.dart'; /// class RelayJitPubkeyStrategy with Logger { - static void handleRequest({ + static Future handleRequest({ required RequestState requestState, required GlobalState globalState, @@ -51,7 +51,7 @@ class RelayJitPubkeyStrategy with Logger { required ReadWriteMarker direction, required List ignoreRelays, required RelayManager relayManager, - }) { + }) async { List combindedPubkeys = [ ...?filter.authors, ...?filter.pTags @@ -60,16 +60,16 @@ class RelayJitPubkeyStrategy with Logger { // init coveragePubkeys List coveragePubkeys = []; - for (var pubkey in combindedPubkeys) { + for (final pubkey in combindedPubkeys) { coveragePubkeys .add(CoveragePubkey(pubkey, desiredCoverage, desiredCoverage)); } // look for connected relays that cover the pubkey - for (var connectedRelay in connectedRelays) { - var coveredPubkeysForRelay = []; + for (final connectedRelay in connectedRelays) { + final coveredPubkeysForRelay = []; - for (var coveragePubkey in coveragePubkeys) { + for (final coveragePubkey in coveragePubkeys) { if (JitEngine.doesRelayCoverPubkey( connectedRelay, coveragePubkey.pubkey, direction)) { coveredPubkeysForRelay.add(coveragePubkey.pubkey); @@ -93,18 +93,18 @@ class RelayJitPubkeyStrategy with Logger { globalState, relayManager, ); - - // clear out fully covered pubkeys - _removeFullyCoveredPubkeys(coveragePubkeys); } + // clear out fully covered pubkeys after processing all connected relays + _removeFullyCoveredPubkeys(coveragePubkeys); + if (coveragePubkeys.isEmpty) { // we are done // all pubkeys are covered by already connected relays return; } - _findRelaysForUnresolvedPubkeys( + await _findRelaysForUnresolvedPubkeys( requestState: requestState, globalState: globalState, relayManger: relayManager, @@ -141,7 +141,7 @@ class RelayJitPubkeyStrategy with Logger { // looks in nip65 data for not covered pubkeys // the result is relay candidates // connects to these candidates and sends out the request - static void _findRelaysForUnresolvedPubkeys({ + static Future _findRelaysForUnresolvedPubkeys({ required RelayManager relayManger, required RequestState requestState, required GlobalState globalState, diff --git a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart index cacfb9cfc..38446a138 100644 --- a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart +++ b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart @@ -6,8 +6,10 @@ import 'package:rxdart/rxdart.dart'; import '../../config/bootstrap_relays.dart'; import '../../config/relay_defaults.dart'; +import '../../data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart'; import '../../shared/helpers/relay_helper.dart'; import '../../shared/logger/logger.dart'; +import '../../shared/logger/log_level.dart'; import '../../shared/nips/nip01/client_msg.dart'; import '../entities/broadcast_state.dart'; import '../entities/connection_source.dart'; @@ -55,6 +57,9 @@ class RelayManager { Stream> get relayConnectivityChanges => _relayUpdatesStreamController.stream; + Timer? _updateRelayConnectivityTimer; + bool _pendingRelayUpdate = false; + /// Creates a new relay manager. RelayManager({ required this.globalState, @@ -69,6 +74,19 @@ class RelayManager { } void updateRelayConnectivity() { + if (_pendingRelayUpdate) return; + + _pendingRelayUpdate = true; + _updateRelayConnectivityTimer?.cancel(); + _updateRelayConnectivityTimer = Timer(Duration(milliseconds: 100), () { + _pendingRelayUpdate = false; + _relayUpdatesStreamController.add(globalState.relays); + }); + } + + void updateRelayConnectivityImmediate() { + _updateRelayConnectivityTimer?.cancel(); + _pendingRelayUpdate = false; _relayUpdatesStreamController.add(globalState.relays); } @@ -264,18 +282,18 @@ class RelayManager { } void reSubscribeInFlightSubscriptions(RelayConnectivity relayConnectivity) { + final relayUrl = relayConnectivity.url; globalState.inFlightRequests.forEach((key, state) { - state.requests.values - .where((req) => req.url == relayConnectivity.url) - .forEach((req) { - if (!state.request.closeOnEOSE) { - List list = ["REQ", state.id]; - list.addAll(req.filters.map((filter) => filter.toMap())); - - relayConnectivity.stats.activeRequests++; - _sendRaw(relayConnectivity, jsonEncode(list)); - } - }); + if (state.request.closeOnEOSE) return; // Skip early + + final req = state.requests[relayUrl]; + if (req != null) { + List list = ["REQ", state.id]; + list.addAll(req.filters.map((filter) => filter.toMap())); + + relayConnectivity.stats.activeRequests++; + _sendRaw(relayConnectivity, jsonEncode(list)); + } }); } @@ -293,6 +311,8 @@ class RelayManager { /// wait until rdy await relayConnectivity.relayTransport!.ready; + //await Future.delayed(Duration(seconds: 10)); + final String encodedMsg = jsonEncode(msg.toJson()); _sendRaw(relayConnectivity, encodedMsg); } @@ -390,16 +410,16 @@ class RelayManager { void _handleIncomingMessage( dynamic message, RelayConnectivity relayConnectivity) { - List eventJson; - try { - eventJson = json.decode(message); - } on FormatException catch (e) { - Logger.log.e( - "FormatException in _handleIncomingMessage for relay ${relayConnectivity.url}: $e, message: $message"); + if (message is! NostrMessageRaw) { + Logger.log.w( + "Received non NostrMessageRaw message from ${relayConnectivity.url}: $message"); return; } - if (eventJson[0] == 'OK') { + final myMsg = message; + final eventJson = myMsg.otherData; + + if (myMsg.type == NostrMessageRawType.ok) { //nip 20 used to notify clients if an EVENT was successful if (eventJson.length >= 2 && eventJson[2] == false) { Logger.log.e("NOT OK from ${relayConnectivity.url}: $eventJson"); @@ -421,22 +441,22 @@ class RelayManager { } return; } - if (eventJson[0] == 'NOTICE') { + if (myMsg.type == NostrMessageRawType.notice) { Logger.log.w("NOTICE from ${relayConnectivity.url}: ${eventJson[1]}"); _logActiveRequests(); - } else if (eventJson[0] == 'EVENT') { + } else if (myMsg.type == NostrMessageRawType.event) { _handleIncomingEvent( - eventJson, relayConnectivity, message.toString().codeUnits.length); + myMsg, relayConnectivity, message.toString().codeUnits.length); Logger.log.t("EVENT from ${relayConnectivity.url}: $eventJson"); - } else if (eventJson[0] == 'EOSE') { + } else if (myMsg.type == NostrMessageRawType.eose) { Logger.log.d("EOSE from ${relayConnectivity.url}: ${eventJson[1]}"); _handleEOSE(eventJson, relayConnectivity); - } else if (eventJson[0] == 'CLOSED') { + } else if (myMsg.type == NostrMessageRawType.closed) { Logger.log.w( " CLOSED subscription url: ${relayConnectivity.url} id: ${eventJson[1]} msg: ${eventJson.length > 2 ? eventJson[2] : ''}"); _handleClosed(eventJson, relayConnectivity); } - if (eventJson[0] == ClientMsgType.kAuth) { + if (myMsg.type == NostrMessageRawType.auth) { // nip 42 used to send authentication challenges final challenge = eventJson[1]; Logger.log.d("AUTH: $challenge"); @@ -463,23 +483,35 @@ class RelayManager { // } } - void _handleIncomingEvent(List eventJson, - RelayConnectivity connectivity, int messageSize) { - var id = eventJson[1]; - if (globalState.inFlightRequests[id] == null) { + void _handleIncomingEvent( + NostrMessageRaw nostrMsgRaw, + RelayConnectivity connectivity, + int messageSize, + ) { + final requestId = nostrMsgRaw.requestId!; + final eventRaw = nostrMsgRaw.nip01Event!; + if (globalState.inFlightRequests[requestId] == null) { Logger.log.w( - "RECEIVED EVENT from ${connectivity.url} for id $id, not in globalState inFlightRequests. Likely data after EOSE on a query"); + "RECEIVED EVENT from ${connectivity.url} for id $requestId, not in globalState inFlightRequests. Likely data after EOSE on a query"); return; } - Nip01Event event = Nip01Event.fromJson(eventJson[2]); + Nip01Event event = Nip01Event( + pubKey: eventRaw.pubKey, + createdAt: eventRaw.createdAt, + kind: eventRaw.kind, + tags: eventRaw.tags, + content: eventRaw.content, + ); + event.sig = eventRaw.sig; + event.id = eventRaw.id; connectivity.stats.incStatsByNewEvent(event, messageSize); - RequestState? state = globalState.inFlightRequests[id]; + RequestState? state = globalState.inFlightRequests[requestId]; if (state != null) { RelayRequestState? request = state.requests[connectivity.url]; if (request == null) { - Logger.log.w("No RelayRequestState found for id $id"); + Logger.log.w("No RelayRequestState found for id $requestId"); return; } event.sources.add(connectivity.url); @@ -546,15 +578,13 @@ class RelayManager { /// check if relays for this request are still connected /// if not ignore it and wait for the ones still alive to receive EOSE - final listOfRelaysForThisRequest = state.requests.keys.toList(); - final myNotConnectedRelays = globalState.relays.keys - .where((url) => listOfRelaysForThisRequest.contains(url)) - .where((url) => !isRelayConnected(url)) - .toList(); + final requestRelayUrls = state.requests.keys.toSet(); + final notConnectedRelays = + requestRelayUrls.where((url) => !isRelayConnected(url)).toSet(); final bool didAllRelaysFinish = state.requests.values.every( (element) => - element.receivedEOSE || myNotConnectedRelays.contains(element.url), + element.receivedEOSE || notConnectedRelays.contains(element.url), ); if (didAllRelaysFinish) { @@ -581,22 +611,14 @@ class RelayManager { } void _logActiveRequests() { - // Map kindsMap = {}; + // Skip expensive iteration if debug logging is not enabled + if (!LogLevel.debug.shouldLog(Logger.log.level)) return; + Map namesMap = {}; globalState.inFlightRequests.forEach((key, state) { - // int? kind; - // if (state.requests.isNotEmpty && - // state.requests.values.first.filters.first.kinds != null && - // state.requests.values.first.filters.first.kinds!.isNotEmpty) { - // kind = state.requests.values.first.filters.first.kinds!.first; - // } - // int? kindCount = kindsMap[kind]; int? nameCount = namesMap[state.request.name]; - // kindCount ??= 0; - // kindCount++; nameCount ??= 0; nameCount++; - // kindsMap[kind] = kindCount; namesMap[state.request.name] = nameCount; }); Logger.log.d(