Skip to content
48 changes: 48 additions & 0 deletions packages/ndk/lib/data_layer/data_sources/websocket_channel.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import 'dart:async';

import 'package:web_socket_channel/web_socket_channel.dart';

class WebsocketChannelDS {
final WebSocketChannel ws;
final String url;

final Completer<void> _readyCompleter = Completer<void>();

WebsocketChannelDS(this.ws, this.url) {
ws.ready.then((_) {
if (!_readyCompleter.isCompleted) {
_readyCompleter.complete();
}
});
}

StreamSubscription<dynamic> listen(
void Function(dynamic) onData, {
Function? onError,
void Function()? onDone,
}) {
return ws.stream.listen(onData, onDone: onDone, onError: onError);
}

void send(dynamic data) {
return ws.sink.add(data);
}

Future<void> close() {
return ws.sink.close();
}

bool isOpen() {
final rdy = _readyCompleter.isCompleted;
final notClosed = ws.closeCode == null;
return rdy && notClosed;
}

int? closeCode() {
return ws.closeCode;
}

String? closeReason() {
return ws.closeReason;
}
}
14 changes: 6 additions & 8 deletions packages/ndk/lib/data_layer/data_sources/websocket_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,18 @@ class WebsocketDSClient {
}

bool isOpen() {
return ws.connection.state == Connected() ||
ws.connection.state == Reconnected();
final state = ws.connection.state;
return state is Connected || state is Reconnected;
}

int? closeCode() {
return ws.connection.state == Disconnected()
? (ws.connection.state as Disconnected).code
: null;
final state = ws.connection.state;
return state is Disconnected ? state.code : null;
}

String? closeReason() {
return ws.connection.state == Disconnected()
? (ws.connection.state as Disconnected).reason
: null;
final state = ws.connection.state;
return state is Disconnected ? state.reason : null;
}
}
// coverage:ignore-end
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'package:web_socket_client/web_socket_client.dart';

import '../../../domain_layer/repositories/nostr_transport.dart';
import '../../../shared/logger/logger.dart';
import '../../data_sources/websocket_channel.dart';
import '../../data_sources/websocket_client.dart';

/// A WebSocket-based implementation of the NostrTransport interface.
Expand All @@ -22,28 +23,24 @@ class WebSocketClientNostrTransport implements NostrTransport {
WebSocketClientNostrTransport(this._websocketDS, [Function? onReconnect]) {
Completer completer = Completer();
ready = completer.future;

_stateStreamSubscription = _websocketDS.ws.connection.listen((state) {
Logger.log.t("${_websocketDS.url} connection state changed to $state");
switch (state) {
case Connected() || Reconnected():
if (state is Connected || state is Reconnected) {
if (!completer.isCompleted) {
completer.complete();
if (state == Reconnected() && onReconnect != null) {
onReconnect.call();
}
break;
case Disconnected():
completer = Completer();
ready = completer.future;
break;
case Connecting():
// Do nothing, just waiting for connection to be established
break;
case Reconnecting():
// Do nothing, just waiting for reconnection to be established
break;
default:
Logger.log.w(
"${_websocketDS.url} connection state changed to unknown state: $state");
}
if (state is Reconnected && onReconnect != null) {
onReconnect.call();
}
} else if (state is Disconnected) {
completer = Completer();
ready = completer.future;
} else if (state is Connecting || state is Reconnecting) {
// Do nothing, just waiting for (re)connection to be established
} else {
Logger.log.w(
"${_websocketDS.url} connection state changed to unknown state: $state");
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import 'package:ndk/data_layer/repositories/nostr_transport/websocket_client_nostr_transport.dart';
import 'package:ndk/data_layer/repositories/nostr_transport/websocket_isolate/websocket_isolate_nostr_transport.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_client/web_socket_client.dart';

import '../../../domain_layer/repositories/nostr_transport.dart';
import '../../../shared/helpers/relay_helper.dart';
import '../../data_sources/websocket_channel.dart';
import '../../data_sources/websocket_client.dart';

class WebSocketClientNostrTransportFactory implements NostrTransportFactory {
final bool useIsolate;

WebSocketClientNostrTransportFactory({this.useIsolate = true});

@override
NostrTransport call(String url, Function? onReconnect) {
final myUrl = cleanRelayUrl(url);
Expand All @@ -14,6 +21,12 @@ class WebSocketClientNostrTransportFactory implements NostrTransportFactory {
throw Exception("relayUrl is not parsable");
}

// Use isolate-based transport for better performance
if (useIsolate) {
return WebSocketIsolateNostrTransport(myUrl, onReconnect);
}

// Fallback to regular transport
final backoff = BinaryExponentialBackoff(
initial: Duration(seconds: 1), maximumStep: 10);
final client = WebSocket(Uri.parse(myUrl), backoff: backoff);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
part of 'websocket_isolate_nostr_transport.dart';

/// Message types for isolate communication
enum _IsolateMessageType {
ready,
reconnecting,
message,
error,
done,
}

/// Internal message class for communication between main isolate and worker isolate
class _IsolateMessage {
/// connection id is the cleaned relay url, (needed so reconnect, restore state works)
final String connectionId;
final _IsolateMessageType type;
final NostrMessageRaw? data;
final String? error;
final int? closeCode;
final String? closeReason;

_IsolateMessage({
required this.connectionId,
required this.type,
this.data,
this.error,
this.closeCode,
this.closeReason,
});
}

/// Base class for commands sent from main isolate to worker isolate
abstract class _IsolateCommand {
final String connectionId;

_IsolateCommand({required this.connectionId});
}

class _ConnectCommand extends _IsolateCommand {
final String url;

_ConnectCommand({required super.connectionId, required this.url});
}

class _SendCommand extends _IsolateCommand {
final dynamic data;

_SendCommand({required super.connectionId, required this.data});
}

class _CloseCommand extends _IsolateCommand {
_CloseCommand({required super.connectionId});
}

enum NostrMessageRawType {
notice,
event,
eose,
ok,
closed,
auth,
unknown,
}

//? needed until Nip01Event is refactored to be immutable
class Nip01EventRaw {
final String id;

final String pubKey;

final int createdAt;

final int kind;

final List<List<String>> tags;

final String content;

final String sig;

Nip01EventRaw({
required this.id,
required this.pubKey,
required this.createdAt,
required this.kind,
required this.tags,
required this.content,
required this.sig,
});
}

class NostrMessageRaw {
final NostrMessageRawType type;
final Nip01EventRaw? nip01Event;
final String? requestId;
final dynamic otherData;

NostrMessageRaw({
required this.type,
this.nip01Event,
this.requestId,
this.otherData,
});
}
Loading
Loading