diff --git a/lib/src/realtime_mixin.dart b/lib/src/realtime_mixin.dart index 389acea8..c230b978 100644 --- a/lib/src/realtime_mixin.dart +++ b/lib/src/realtime_mixin.dart @@ -1,14 +1,16 @@ import 'dart:async'; import 'dart:convert'; + import 'package:flutter/foundation.dart'; -import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/status.dart' as status; -import 'exception.dart'; -import 'realtime_subscription.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + import 'client.dart'; +import 'exception.dart'; import 'realtime_message.dart'; import 'realtime_response.dart'; import 'realtime_response_connected.dart'; +import 'realtime_subscription.dart'; typedef WebSocketFactory = Future Function(Uri uri); typedef GetFallbackCookie = String? Function(); @@ -87,10 +89,14 @@ mixin RealtimeMixin { break; } }, onDone: () { - final subscriptions = List.from(_subscriptions.values); - for (var subscription in subscriptions) { - subscription.close(); + if (!_notifyDone || _creatingSocket) return; + // create a copy of the keys to avoid concurrent modification + var subscriptionKeys = _subscriptions.keys.toList(); + for (var key in subscriptionKeys) { + // close the subscription + _subscriptions[key]?.close(); } + _channels.clear(); _closeConnection(); }, onError: (err, stack) { @@ -157,13 +163,23 @@ mixin RealtimeMixin { } void _cleanup(List channels) { + // create a list of channels to remove from the list of channels + List channelsToRemove = []; + for (var channel in channels) { + // check if the channel is still in use bool found = _subscriptions.values .any((subscription) => subscription.channels.contains(channel)); + if (!found) { - _channels.remove(channel); + // if not, add it to the list of channels to remove + channelsToRemove.add(channel); } } + + for (var channel in channelsToRemove) { + _channels.remove(channel); + } } void handleError(RealtimeResponse response) {