diff --git a/electrum_nmc/electrum/commands.py b/electrum_nmc/electrum/commands.py index 442251b71022..b7fc3f26be39 100644 --- a/electrum_nmc/electrum/commands.py +++ b/electrum_nmc/electrum/commands.py @@ -303,12 +303,12 @@ async def make_seed(self, nbits=132, language=None, seed_type=None): return s @command('n') - async def getaddresshistory(self, address): + async def getaddresshistory(self, address, stream_id=None): """Return the transaction history of any address. Note: This is a walletless server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - return await self.network.get_history_for_scripthash(sh) + return await self.network.get_history_for_scripthash(sh, stream_id=stream_id) @command('w') async def listunspent(self, wallet: Abstract_Wallet = None): @@ -372,12 +372,12 @@ async def name_list(self, identifier=None): return result @command('n') - async def getaddressunspent(self, address): + async def getaddressunspent(self, address, stream_id=None): """Returns the UTXO list of any address. Note: This is a walletless server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - return await self.network.listunspent_for_scripthash(sh) + return await self.network.listunspent_for_scripthash(sh, stream_id=stream_id) @command('') async def serialize(self, jsontx): @@ -428,10 +428,10 @@ async def deserialize(self, tx): return tx.deserialize(force_full_parse=True) @command('n') - async def broadcast(self, tx): + async def broadcast(self, tx, stream_id=None): """Broadcast a transaction to the network. """ tx = Transaction(tx) - await self.network.broadcast_transaction(tx) + await self.network.broadcast_transaction(tx, stream_id=stream_id) return tx.txid() @command('') @@ -497,21 +497,21 @@ async def getbalance(self, wallet: Abstract_Wallet = None): return out @command('n') - async def getaddressbalance(self, address): + async def getaddressbalance(self, address, stream_id=None): """Return the balance of any address. Note: This is a walletless server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - out = await self.network.get_balance_for_scripthash(sh) + out = await self.network.get_balance_for_scripthash(sh, stream_id=stream_id) out["confirmed"] = str(Decimal(out["confirmed"])/COIN) out["unconfirmed"] = str(Decimal(out["unconfirmed"])/COIN) return out @command('n') - async def getmerkle(self, txid, height): + async def getmerkle(self, txid, height, stream_id=None): """Get Merkle branch of a transaction included in a block. Electrum uses this to verify transactions (Simple Payment Verification).""" - return await self.network.get_merkle_for_transaction(txid, int(height)) + return await self.network.get_merkle_for_transaction(txid, int(height), stream_id=stream_id) @command('n') async def getservers(self): @@ -689,12 +689,12 @@ async def paytomany(self, outputs, fee=None, feerate=None, from_addr=None, from_ return tx.as_dict() @command('wp') - async def name_new(self, identifier, destination=None, amount=0.0, fee=None, from_addr=None, change_addr=None, nocheck=False, unsigned=False, rbf=None, password=None, locktime=None, allow_existing=False): + async def name_new(self, identifier, destination=None, amount=0.0, fee=None, from_addr=None, change_addr=None, nocheck=False, unsigned=False, rbf=None, password=None, locktime=None, allow_existing=False, stream_id=None): """Create a name_new transaction. """ if not allow_existing: name_exists = True try: - show = self.name_show(identifier) + show = self.name_show(identifier, stream_id=stream_id) except NameNotFoundError: name_exists = False if name_exists: @@ -772,7 +772,7 @@ async def name_update(self, identifier, value=None, destination=None, amount=0.0 return tx.as_dict() @command('wpn') - async def name_autoregister(self, identifier, value, destination=None, amount=0.0, fee=None, from_addr=None, change_addr=None, nocheck=False, rbf=None, password=None, locktime=None, allow_existing=False): + async def name_autoregister(self, identifier, value, destination=None, amount=0.0, fee=None, from_addr=None, change_addr=None, nocheck=False, rbf=None, password=None, locktime=None, allow_existing=False, stream_id=None): """Creates a name_new transaction, broadcasts it, creates a corresponding name_firstupdate transaction, and queues it. """ # Validate the value before we try to pre-register the name. That way, @@ -781,12 +781,12 @@ async def name_autoregister(self, identifier, value, destination=None, amount=0. validate_value_length(value) # TODO: Don't hardcode the 0.005 name_firstupdate fee - new_result = self.name_new(identifier, amount=amount+0.005, fee=fee, from_addr=from_addr, change_addr=change_addr, nocheck=nocheck, rbf=rbf, password=password, locktime=locktime, allow_existing=allow_existing) + new_result = self.name_new(identifier, amount=amount+0.005, fee=fee, from_addr=from_addr, change_addr=change_addr, nocheck=nocheck, rbf=rbf, password=password, locktime=locktime, allow_existing=allow_existing, stream_id=stream_id) new_txid = new_result["txid"] new_rand = new_result["rand"] new_tx = new_result["tx"]["hex"] - self.broadcast(new_tx) + self.broadcast(new_tx, stream_id=stream_id) # We add the name_new transaction to the wallet explicitly because # otherwise, the wallet will only learn about the name_new once the @@ -882,13 +882,13 @@ async def listaddresses(self, receiving=False, change=False, labels=False, froze return out @command('n') - async def gettransaction(self, txid, wallet: Abstract_Wallet = None): + async def gettransaction(self, txid, stream_id=None, wallet: Abstract_Wallet = None): """Retrieve a transaction. """ tx = None if wallet: tx = wallet.db.get_transaction(txid) if tx is None: - raw = await self.network.get_transaction(txid) + raw = await self.network.get_transaction(txid, stream_id=stream_id) if raw: tx = Transaction(raw) else: @@ -1043,7 +1043,9 @@ async def updatequeuedtransactions(self): if trigger_name is not None: # TODO: handle non-ASCII trigger_name try: - current_height = self.name_show(trigger_name)["height"] + # TODO: Store a stream ID in the queue, so that we can be + # more intelligent than using the txid. + current_height = self.name_show(trigger_name, stream_id="txid: " + txid)["height"] current_depth = chain_height - current_height + 1 except NameNotFoundError: current_depth = 36000 @@ -1056,7 +1058,9 @@ async def updatequeuedtransactions(self): if current_depth >= trigger_depth: tx = queue_item["tx"] try: - self.broadcast(tx) + # TODO: Store a stream ID in the queue, so that we can be + # more intelligent than using the txid. + self.broadcast(tx, stream_id="txid: " + txid) except Exception as e: errors[txid] = str(e) @@ -1128,12 +1132,12 @@ async def getfeerate(self, fee_method=None, fee_level=None): return self.config.fee_per_kb(dyn=dyn, mempool=mempool, fee_level=fee_level) @command('n') - async def name_show(self, identifier): + async def name_show(self, identifier, stream_id=None): # TODO: support non-ASCII encodings identifier_bytes = identifier.encode("ascii") sh = name_identifier_to_scripthash(identifier_bytes) - txs = self.network.run_from_another_thread(self.network.get_history_for_scripthash(sh)) + txs = self.network.run_from_another_thread(self.network.get_history_for_scripthash(sh, stream_id=stream_id)) # Pick the most recent name op that's [12, 36000) confirmations. chain_height = self.network.blockchain().height() @@ -1156,10 +1160,10 @@ async def name_show(self, identifier): header = self.network.blockchain().read_header(height) if header is None: if height < constants.net.max_checkpoint(): - self.network.run_from_another_thread(self.network.request_chunk(height, None)) + self.network.run_from_another_thread(self.network.request_chunk(height, None, stream_id=stream_id)) # (from verifier._request_and_verify_single_proof) - merkle = self.network.run_from_another_thread(self.network.get_merkle_for_transaction(txid, height)) + merkle = self.network.run_from_another_thread(self.network.get_merkle_for_transaction(txid, height, stream_id=stream_id)) if height != merkle.get('block_height'): raise Exception('requested height {} differs from received height {} for txid {}' .format(height, merkle.get('block_height'), txid)) @@ -1177,7 +1181,7 @@ async def wait_for_header(): if self.wallet and txid in self.wallet.db.transactions: tx = self.wallet.db.transactions[txid] else: - raw = self.network.run_from_another_thread(self.network.get_transaction(txid)) + raw = self.network.run_from_another_thread(self.network.get_transaction(txid, stream_id=stream_id)) if raw: tx = Transaction(raw) else: @@ -1390,6 +1394,7 @@ def eval_bool(x: str) -> bool: 'fee_level': (None, "Float between 0.0 and 1.0, representing fee slider position"), 'from_height': (None, "Only show transactions that confirmed after given block height"), 'to_height': (None, "Only show transactions that confirmed before given block height"), + 'stream_id': (None, "Stream-isolate the network connection using this stream ID (only used with Tor)"), 'destination': (None, "Namecoin address, contact or alias"), 'amount': (None, "Amount to be sent (in NMC). Type \'!\' to send the maximum available."), 'allow_existing': (None, "Allow pre-registering a name that already is registered. Your registration fee will be forfeited until you can register the name after it expires."), diff --git a/electrum_nmc/electrum/interface.py b/electrum_nmc/electrum/interface.py index 0471a720becd..a4c9c47112b5 100644 --- a/electrum_nmc/electrum/interface.py +++ b/electrum_nmc/electrum/interface.py @@ -709,6 +709,31 @@ def do_bucket(): return self._ipaddr_bucket +class InterfaceSecondary(Interface): + """An Interface that doesn't try to fetch blocks, and instead stays idle + until it's explicitly used for something.""" + + async def ping(self): + # Since InterfaceSecondary doesn't ping periodically once it becomes + # dirty, it will time out if the user stops using it. That's good, + # since otherwise we'd accumulate a giant pile of secondary interfaces + # for stream ID's that aren't in use anymore. + while True: + await asyncio.sleep(300) + if self not in self.network.interfaces_clean.values(): + break + await self.session.send_request('server.ping') + + async def run_fetch_blocks(self): + if self.ready.cancelled(): + raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled') + if self.ready.done(): + return + + # Without this, the Interface will think the connection timed out. + self.ready.set_result(1) + + def _assert_header_does_not_check_against_any_chain(header: dict) -> None: chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) if chain_bad: diff --git a/electrum_nmc/electrum/network.py b/electrum_nmc/electrum/network.py index e91458fd3805..c72871c4f92e 100644 --- a/electrum_nmc/electrum/network.py +++ b/electrum_nmc/electrum/network.py @@ -52,7 +52,7 @@ from . import blockchain from . import bitcoin from .blockchain import Blockchain, HEADER_SIZE -from .interface import (Interface, serialize_server, deserialize_server, +from .interface import (Interface, InterfaceSecondary, serialize_server, deserialize_server, RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS, NetworkException) from .version import PROTOCOL_VERSION @@ -291,9 +291,13 @@ def __init__(self, config: SimpleConfig): self.interface = None # type: Interface # set of servers we have an ongoing connection with self.interfaces = {} # type: Dict[str, Interface] + self.interfaces_clean = {} # type: Dict[str, Interface] + self.interfaces_for_stream_ids = {} # type: Dict[str, InterfaceSecondary] self.auto_connect = self.config.get('auto_connect', True) self.connecting = set() + self.connecting_clean = set() self.server_queue = None + self.server_queue_clean = None self.proxy = None # Dump network messages (all interfaces). Set at runtime from the console. @@ -497,6 +501,46 @@ def get_interfaces(self) -> List[str]: with self.interfaces_lock: return list(self.interfaces) + def get_interfaces_clean(self) -> List[str]: + """The list of servers for connected interfaces that are ready for + future use by stream-isolated operations.""" + with self.interfaces_lock: + return list(self.interfaces_clean) + + def get_interface_for_stream_id(self, stream_id) -> Optional[Interface]: + """If a proxy is enabled, returns an Interface using a TCP connection + that is guaranteed to be unique per stream_id. Otherwise, just returns + the main Interface. Useful for improved privacy with Tor stream + isolation.""" + + # Default to self.interface if no stream ID was provided + if stream_id is None: + return self.interface + + # If no anonymizing proxy is in use, then using multiple servers will + # probably do more harm than good. + if self.proxy is None: + return self.interface + + try: + # Reuse an existing interface if it already exists for this stream + # ID. + with self.interfaces_lock: + interface = self.interfaces_for_stream_ids[stream_id] + except KeyError: + # An interface doesn't exist for this stream ID, so pick an + # interface from the clean pool. + with self.interfaces_lock: + ready_servers = [server for server in self.interfaces_clean if self.interfaces_clean[server].ready.done()] + if len(ready_servers) == 0: + return None + server = random.choice(ready_servers) + interface = self.interfaces_clean[server] + self.interfaces_for_stream_ids[stream_id] = interface + del self.interfaces_clean[server] + + return interface + @with_recent_servers_lock def get_servers(self): # note: order of sources when adding servers here is crucial! @@ -532,6 +576,11 @@ def _start_interface(self, server: str): self.connecting.add(server) self.server_queue.put(server) + def _start_interface_clean(self, server: str): + if server not in self.interfaces_clean and server not in self.connecting_clean: + self.connecting_clean.add(server) + self.server_queue_clean.put(server) + def _start_random_interface(self): with self.interfaces_lock: exclude_set = self.disconnected_servers | set(self.interfaces) | self.connecting @@ -540,6 +589,14 @@ def _start_random_interface(self): self._start_interface(server) return server + def _start_random_interface_clean(self): + with self.interfaces_lock: + exclude_set = self.disconnected_servers_clean | set(self.interfaces_clean) | self.connecting_clean + server = pick_random_server(self.get_servers(), self.protocol, exclude_set) + if server: + self._start_interface_clean(server) + return server + def _set_proxy(self, proxy: Optional[dict]): self.proxy = proxy # Store these somewhere so we can un-monkey-patch @@ -734,11 +791,24 @@ async def switch_to_interface(self, server: str): async def _close_interface(self, interface): if interface: - with self.interfaces_lock: - if self.interfaces.get(interface.server) == interface: - self.interfaces.pop(interface.server) - if interface.server == self.default_server: - self.interface = None + if isinstance(interface, InterfaceSecondary): + with self.interfaces_lock: + # If it's a dirty secondary interface, then remove it for + # the stream ID that it's associated with. + stream_ids = list(self.interfaces_for_stream_ids) + for stream_id in stream_ids: + if self.interfaces_for_stream_ids[stream_id] == interface: + self.interfaces_for_stream_ids.pop(stream_id) + # If it's a clean secondary interface, then remove it from + # the clean interface pool. + if self.interfaces_clean.get(interface.server) == interface: + self.interfaces_clean.pop(interface.server) + else: + with self.interfaces_lock: + if self.interfaces.get(interface.server) == interface: + self.interfaces.pop(interface.server) + if interface.server == self.default_server: + self.interface = None await interface.close() @with_recent_servers_lock @@ -754,10 +824,16 @@ async def connection_down(self, interface: Interface): '''A connection to server either went down, or was never made. We distinguish by whether it is in self.interfaces.''' if not interface: return + server = interface.server - self.disconnected_servers.add(server) - if server == self.default_server: - self._set_status('disconnected') + if isinstance(interface, InterfaceSecondary): + if interface in self.interfaces_clean.values(): + self.disconnected_servers_clean.add(server) + else: + self.disconnected_servers.add(server) + if server == self.default_server: + self._set_status('disconnected') + await self._close_interface(interface) self.trigger_callback('network_updated') @@ -794,7 +870,30 @@ async def _run_new_interface(self, server): self._add_recent_server(server) self.trigger_callback('network_updated') - def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check) -> bool: + @ignore_exceptions # do not kill main_taskgroup + @log_exceptions + async def _run_new_interface_clean(self, server): + interface = InterfaceSecondary(self, server, self.proxy) + # note: using longer timeouts here as DNS can sometimes be slow! + timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic) + try: + await asyncio.wait_for(interface.ready, timeout) + except BaseException as e: + self.logger.info(f"couldn't launch secondary iface {server} -- {repr(e)}") + await interface.close() + return + else: + with self.interfaces_lock: + assert server not in self.interfaces_clean + self.interfaces_clean[server] = interface + finally: + try: self.connecting_clean.remove(server) + except KeyError: pass + + self._add_recent_server(server) + self.trigger_callback('network_updated') + + def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check, check_clean_pool=False) -> bool: # main interface is exempt. this makes switching servers easier if iface_to_check.is_main_server(): return True @@ -802,7 +901,10 @@ def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_c return True # bucket connected interfaces with self.interfaces_lock: - interfaces = list(self.interfaces.values()) + if not check_clean_pool: + interfaces = list(self.interfaces.values()) + else: + interfaces = list(self.interfaces_clean.values()) if iface_to_check in interfaces: interfaces.remove(iface_to_check) buckets = defaultdict(list) @@ -833,10 +935,27 @@ async def _init_headers_file(self): with b.lock: b.update_size() + def stream_isolated(func): + async def wrapper(self, *args, **kwargs): + if 'stream_id' in kwargs: + kwargs['interface'] = self.get_interface_for_stream_id(kwargs['stream_id']) + if kwargs['interface'] is None: + raise Exception("No clean interface is ready") + kwargs.pop('stream_id') + elif 'interface' not in kwargs: + kwargs['interface'] = self.interface + + return await func(self, *args, **kwargs) + + return wrapper + def best_effort_reliable(func): async def make_reliable_wrapper(self, *args, **kwargs): for i in range(10): - iface = self.interface + if 'stream_id' in kwargs and kwargs['stream_id'] is not None: + iface = self.get_interface_for_stream_id(kwargs['stream_id']) + else: + iface = self.interface # retry until there is a main interface if not iface: await asyncio.sleep(0.1) @@ -874,19 +993,21 @@ async def wrapper(self, *args, **kwargs): @best_effort_reliable @catch_server_exceptions - async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict: + @stream_isolated + async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int, interface: Interface = None) -> dict: if not is_hash256_str(tx_hash): raise Exception(f"{repr(tx_hash)} is not a txid") if not is_non_negative_integer(tx_height): raise Exception(f"{repr(tx_height)} is not a block height") - return await self.interface.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height]) + return await interface.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height]) @best_effort_reliable - async def broadcast_transaction(self, tx, *, timeout=None) -> None: + @stream_isolated + async def broadcast_transaction(self, tx, *, timeout=None, interface: Interface = None) -> None: if timeout is None: timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent) try: - out = await self.interface.session.send_request('blockchain.transaction.broadcast', [str(tx)], timeout=timeout) + out = await interface.session.send_request('blockchain.transaction.broadcast', [str(tx)], timeout=timeout) # note: both 'out' and exception messages are untrusted input from the server except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError): raise # pass-through @@ -1042,44 +1163,50 @@ def sanitize_tx_broadcast_response(server_msg) -> str: @best_effort_reliable @catch_server_exceptions - async def request_chunk(self, height: int, tip=None, *, can_return_early=False): + @stream_isolated + async def request_chunk(self, height: int, tip=None, *, can_return_early=False, interface: Interface = None): if not is_non_negative_integer(height): raise Exception(f"{repr(height)} is not a block height") - return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early) + return await interface.request_chunk(height, tip=tip, can_return_early=can_return_early) @best_effort_reliable @catch_server_exceptions - async def get_transaction(self, tx_hash: str, *, timeout=None) -> str: + @stream_isolated + async def get_transaction(self, tx_hash: str, *, timeout=None, interface: Interface = None) -> str: if not is_hash256_str(tx_hash): raise Exception(f"{repr(tx_hash)} is not a txid") - return await self.interface.session.send_request('blockchain.transaction.get', [tx_hash], + return await interface.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout) @best_effort_reliable @catch_server_exceptions - async def get_history_for_scripthash(self, sh: str) -> List[dict]: + @stream_isolated + async def get_history_for_scripthash(self, sh: str, interface: Interface = None) -> List[dict]: if not is_hash256_str(sh): raise Exception(f"{repr(sh)} is not a scripthash") - return await self.interface.session.send_request('blockchain.scripthash.get_history', [sh]) + return await interface.session.send_request('blockchain.scripthash.get_history', [sh]) @best_effort_reliable @catch_server_exceptions - async def listunspent_for_scripthash(self, sh: str) -> List[dict]: + @stream_isolated + async def listunspent_for_scripthash(self, sh: str, interface: Interface = None) -> List[dict]: if not is_hash256_str(sh): raise Exception(f"{repr(sh)} is not a scripthash") - return await self.interface.session.send_request('blockchain.scripthash.listunspent', [sh]) + return await interface.session.send_request('blockchain.scripthash.listunspent', [sh]) @best_effort_reliable @catch_server_exceptions - async def get_balance_for_scripthash(self, sh: str) -> dict: + @stream_isolated + async def get_balance_for_scripthash(self, sh: str, interface: Interface = None) -> dict: if not is_hash256_str(sh): raise Exception(f"{repr(sh)} is not a scripthash") - return await self.interface.session.send_request('blockchain.scripthash.get_balance', [sh]) + return await interface.session.send_request('blockchain.scripthash.get_balance', [sh]) @best_effort_reliable - async def get_txid_from_txpos(self, tx_height, tx_pos, merkle): + @stream_isolated + async def get_txid_from_txpos(self, tx_height, tx_pos, merkle, interface: Interface = None): command = 'blockchain.transaction.id_from_pos' - return await self.interface.session.send_request(command, [tx_height, tx_pos, merkle]) + return await interface.session.send_request(command, [tx_height, tx_pos, merkle]) def blockchain(self) -> Blockchain: interface = self.interface @@ -1149,11 +1276,13 @@ async def _start(self): assert not self.main_taskgroup self.main_taskgroup = main_taskgroup = SilentTaskGroup() assert not self.interface and not self.interfaces - assert not self.connecting and not self.server_queue + assert not self.connecting and not self.connecting_clean and not self.server_queue and not self.server_queue_clean self.logger.info('starting network') self.disconnected_servers = set([]) + self.disconnected_servers_clean = set([]) self.protocol = deserialize_server(self.default_server)[2] self.server_queue = queue.Queue() + self.server_queue_clean = queue.Queue() self._set_proxy(deserialize_proxy(self.config.get('proxy'))) self._set_oneserver(self.config.get('oneserver', False)) self._start_interface(self.default_server) @@ -1193,8 +1322,11 @@ async def _stop(self, full_shutdown=False): self.main_taskgroup = None # type: TaskGroup self.interface = None # type: Interface self.interfaces = {} # type: Dict[str, Interface] + self.interfaces_for_stream_ids = {} # type: Dict[str, Interface] self.connecting.clear() + self.connecting_clean.clear() self.server_queue = None + self.server_queue_clean = None if not full_shutdown: self.trigger_callback('network_updated') @@ -1226,14 +1358,23 @@ async def launch_already_queued_up_new_interfaces(): while self.server_queue.qsize() > 0: server = self.server_queue.get() await self.main_taskgroup.spawn(self._run_new_interface(server)) + if self.proxy is not None: + while self.server_queue_clean.qsize() > 0: + server = self.server_queue_clean.get() + await self.main_taskgroup.spawn(self._run_new_interface_clean(server)) async def maybe_queue_new_interfaces_to_be_launched_later(): now = time.time() for i in range(self.num_server - len(self.interfaces) - len(self.connecting)): # FIXME this should try to honour "healthy spread of connected servers" self._start_random_interface() + if self.proxy is not None: + for i in range(self.num_server - len(self.interfaces_clean) - len(self.connecting_clean)): + # FIXME this should try to honour "healthy spread of connected servers" + self._start_random_interface_clean() if now - self.nodes_retry_time > NODES_RETRY_INTERVAL: self.logger.info('network: retrying connections') self.disconnected_servers = set([]) + self.disconnected_servers_clean = set([]) self.nodes_retry_time = now async def maintain_healthy_spread_of_connected_servers(): with self.interfaces_lock: interfaces = list(self.interfaces.values()) @@ -1243,6 +1384,14 @@ async def maintain_healthy_spread_of_connected_servers(): self.logger.info(f"disconnecting from {iface.server}. too many connected " f"servers already in bucket {iface.bucket_based_on_ipaddress()}") await self._close_interface(iface) + if self.proxy is not None: + with self.interfaces_lock: interfaces = list(self.interfaces_clean.values()) + random.shuffle(interfaces) + for iface in interfaces: + if not self.check_interface_against_healthy_spread_of_connected_servers(iface, check_clean_pool=True): + self.logger.info(f"disconnecting from secondary {iface.server}. too many connected " + f"servers already in bucket {iface.bucket_based_on_ipaddress()}") + await self._close_interface(iface) async def maintain_main_interface(): await self._ensure_there_is_a_main_interface() if self.is_connected():