diff --git a/.github/workflows/black-formatter.yml b/.github/workflows/black-formatter.yml new file mode 100644 index 0000000..51a089a --- /dev/null +++ b/.github/workflows/black-formatter.yml @@ -0,0 +1,20 @@ +name: Code Style + +on: + push: + branches: + - master + pull_request: + schedule: + - cron: '0 12 * * 4' + +jobs: + black_formatter: + name: Black Formatter + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - uses: psf/black@stable + with: + options: "--check --verbose" diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 1d671bc..f1301f7 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -30,17 +30,12 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: # We must fetch at least the immediate parents so that if this is # a pull request then we can checkout the head. fetch-depth: 2 - # If this run was triggered by a pull request event, then checkout - # the head of the pull request instead of the merge commit. - - run: git checkout HEAD^2 - if: ${{ github.event_name == 'pull_request' }} - # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL uses: github/codeql-action/init@v2 diff --git a/Events.py b/Events.py index fb21053..5479f11 100644 --- a/Events.py +++ b/Events.py @@ -10,6 +10,7 @@ class EventType(Enum): """ Enum of event names used by TS3. """ + CLIENT_ENTER = "notifycliententerview" CLIENT_LEFT = "notifyclientleftview" CLIENT_MOVED = "notifyclientmoved" @@ -28,16 +29,24 @@ class EventType(Enum): # client events ... server_events = [EventType.SERVER_EDITED, EventType.CLIENT_ENTER, EventType.CLIENT_LEFT] text_events = [EventType.TEXT_MESSAGE] -channel_events = [EventType.CLIENT_ENTER, EventType.CLIENT_LEFT, EventType.CLIENT_MOVED, - EventType.CHANNEL_DESC_CHANGED, EventType.CHANNEL_EDITED, - EventType.CHANNEL_CREATED, EventType.CHANNEL_MOVED, - EventType.CHANNEL_DELETED, EventType.CHANNEL_PASSWORD_CHANGED] +channel_events = [ + EventType.CLIENT_ENTER, + EventType.CLIENT_LEFT, + EventType.CLIENT_MOVED, + EventType.CHANNEL_DESC_CHANGED, + EventType.CHANNEL_EDITED, + EventType.CHANNEL_CREATED, + EventType.CHANNEL_MOVED, + EventType.CHANNEL_DELETED, + EventType.CHANNEL_PASSWORD_CHANGED, +] class ReasonID(IntEnum): """ Enum of ReasonID given for events. """ + SELF_JOINED = 0 MOVED = 1 TIMEOUT = 3 @@ -49,10 +58,11 @@ class ReasonID(IntEnum): SERVER_SHUTDOWN = 11 -class TS3Event(): +class TS3Event: """ Event class for Teamspeak 3 events. This is a stub for all other events. """ + event_type = EventType.UNKNOWN def __init__(self, data, event_type=None): @@ -76,11 +86,10 @@ def __getattr__(self, item): :return: Attribute value string :rtype: str """ - print(self._data, item) return self._data[item] -class EventParser(): +class EventParser: @staticmethod def parse_event(event, event_type): """ @@ -96,7 +105,7 @@ def parse_event(event, event_type): parsed_event = TextMessageEvent(event) return parsed_event if EventType.CLIENT_MOVED.value == event_type: - if 'invokerid' in event: + if "invokerid" in event: parsed_event = ClientMovedEvent(event) else: parsed_event = ClientMovedSelfEvent(event) @@ -105,8 +114,10 @@ def parse_event(event, event_type): parsed_event = ClientEnteredEvent(event) return parsed_event if EventType.CLIENT_LEFT.value == event_type: - reason_id = int(event.get("reasonid", '-1')) - if reason_id == int(ReasonID.SERVER_KICK) or reason_id == int(ReasonID.CHANNEL_KICK): + reason_id = int(event.get("reasonid", "-1")) + if reason_id == int(ReasonID.SERVER_KICK) or reason_id == int( + ReasonID.CHANNEL_KICK + ): parsed_event = ClientKickedEvent(event) elif reason_id == int(ReasonID.BAN): parsed_event = ClientBannedEvent(event) @@ -142,6 +153,7 @@ class ServerEditedEvent(TS3Event): """ Event created when the server is edited. """ + event_type = EventType.SERVER_EDITED def __init__(self, data): @@ -185,16 +197,17 @@ class ChannelEditedEvent(TS3Event): """ Event created on channel edits. """ + event_type = EventType.CHANNEL_EDITED def __init__(self, data): super().__init__(data) - self._channel_id = data.get('cid', '-1') - self._channel_topic = data.get('channel_topic', '') - self._invoker_id = data.get('invokerid', '-1') - self._invoker_name = data.get('invokername', '') - self._invoker_uid = data.get('invokeruid', '-1') - self._reason_id = data.get('reasonid', '-1') + self._channel_id = data.get("cid", "-1") + self._channel_topic = data.get("channel_topic", "") + self._invoker_id = data.get("invokerid", "-1") + self._invoker_name = data.get("invokername", "") + self._invoker_uid = data.get("invokeruid", "-1") + self._reason_id = data.get("reasonid", "-1") @property def channel_id(self): @@ -225,16 +238,17 @@ class ChannelCreatedEvent(TS3Event): """ Event created on channel creation. """ + event_type = EventType.CHANNEL_CREATED def __init__(self, data): super().__init__(data) - self._channel_id = data.get('cid', '-1') - self._channel_topic = data.get('channel_topic', '') - self._invoker_id = data.get('invokerid', '-1') - self._invoker_name = data.get('invokername', '') - self._invoker_uid = data.get('invokeruid', '-1') - self._reason_id = data.get('reasonid', '-1') + self._channel_id = data.get("cid", "-1") + self._channel_topic = data.get("channel_topic", "") + self._invoker_id = data.get("invokerid", "-1") + self._invoker_name = data.get("invokername", "") + self._invoker_uid = data.get("invokeruid", "-1") + self._reason_id = data.get("reasonid", "-1") @property def channel_id(self): @@ -260,18 +274,20 @@ def invoker_uid(self): def reason_id(self): return self._reason_id + class ChannelDeletedEvent(TS3Event): """ Event created on channel deletion. """ + event_type = EventType.CHANNEL_DELETED def __init__(self, data): super().__init__(data) - self._channel_id = data.get('cid', '-1') - self._invoker_id = data.get('invokerid', '-1') - self._invoker_name = data.get('invokername', '') - self._invoker_uid = data.get('invokeruid', '-1') + self._channel_id = data.get("cid", "-1") + self._invoker_id = data.get("invokerid", "-1") + self._invoker_name = data.get("invokername", "") + self._invoker_uid = data.get("invokeruid", "-1") @property def channel_id(self): @@ -289,21 +305,23 @@ def invoker_name(self): def invoker_uid(self): return self._invoker_uid + class ChannelMovedEvent(TS3Event): """ Event created when a channel is moved in the server hierarchy. """ + event_type = EventType.CHANNEL_MOVED def __init__(self, data): super().__init__(data) - self._cid = data.get('cid', '-1') - self._cpid = data.get('cpid', '-1') - self._order = data.get('order', '-1') - self._reasonid = data.get('reasonid', '-1') - self._invoker_id = data.get('invokerid', '-1') - self._invoker_name = data.get('invokername', '') - self._invoker_uid = data.get('invokeruid', '-1') + self._cid = data.get("cid", "-1") + self._cpid = data.get("cpid", "-1") + self._order = data.get("order", "-1") + self._reasonid = data.get("reasonid", "-1") + self._invoker_id = data.get("invokerid", "-1") + self._invoker_name = data.get("invokername", "") + self._invoker_uid = data.get("invokeruid", "-1") @property def channel_id(self): @@ -333,29 +351,33 @@ def invoker_name(self): def invoker_uid(self): return self._invoker_uid + class ChannelDescriptionEditedEvent(TS3Event): """ Event created on channel description change. """ + event_type = EventType.CHANNEL_DESC_CHANGED def __init__(self, data): super().__init__(data) - self._channel_id = int(data.get('cid', '-1')) + self._channel_id = int(data.get("cid", "-1")) @property def channel_id(self): return self._channel_id + class ChannelPasswordChangedEvent(TS3Event): """ Event created on channel password change. """ + event_type = EventType.CHANNEL_PASSWORD_CHANGED def __init__(self, data): super().__init__(data) - self._channel_id = int(data.get('cid', '-1')) + self._channel_id = int(data.get("cid", "-1")) @property def channel_id(self): @@ -366,30 +388,33 @@ class ClientEnteredEvent(TS3Event): """ Event created when a client enters a channel. """ + event_type = EventType.CLIENT_ENTER def __init__(self, data): super().__init__(data) try: - self._client_id = int(data.get('clid', '-1')) - self._client_name = data.get('client_nickname', '') - self._client_uid = data.get('client_unique_identifier', '') - self._client_description = data.get('client_description', '') - self._client_country = data.get('client_country', '') - self._client_away = data.get('client_away', '') - self._client_away_msg = data.get('client_away_message', '') - self._client_input_muted = data.get('client_input_muted', '') - self._client_output_muted = data.get('client_output_muted', '') - self._client_outputonly_muted = data.get('client_outputonly_muted', '') - self._client_input_hardware = data.get('client_input_hardware', '') - self._client_output_hardware = data.get('client_output_hardware', '') - self._target_channel_id = int(data.get('ctid', '-1')) - self._from_channel_id = int(data.get('cfid', '-1')) - self._reason_id = int(data.get('reasonid', '-1')) - self._client_is_recording = data.get('client_is_recording', '') - self._client_dbid = data.get('client_database_id', '') - self._client_servergroups = data.get('client_servergroups', '') - self._client_channel_group_id = int(data.get('client_channel_group_id', '-1')) + self._client_id = int(data.get("clid", "-1")) + self._client_name = data.get("client_nickname", "") + self._client_uid = data.get("client_unique_identifier", "") + self._client_description = data.get("client_description", "") + self._client_country = data.get("client_country", "") + self._client_away = data.get("client_away", "") + self._client_away_msg = data.get("client_away_message", "") + self._client_input_muted = data.get("client_input_muted", "") + self._client_output_muted = data.get("client_output_muted", "") + self._client_outputonly_muted = data.get("client_outputonly_muted", "") + self._client_input_hardware = data.get("client_input_hardware", "") + self._client_output_hardware = data.get("client_output_hardware", "") + self._target_channel_id = int(data.get("ctid", "-1")) + self._from_channel_id = int(data.get("cfid", "-1")) + self._reason_id = int(data.get("reasonid", "-1")) + self._client_is_recording = data.get("client_is_recording", "") + self._client_dbid = data.get("client_database_id", "") + self._client_servergroups = data.get("client_servergroups", "") + self._client_channel_group_id = int( + data.get("client_channel_group_id", "-1") + ) except: self._logger.error("Failed to parse ClientEnterEvent:") self._logger.error(data) @@ -479,15 +504,16 @@ class ClientLeftEvent(TS3Event): """ Event created when a client leaves a channel. """ + event_type = EventType.CLIENT_LEFT def __init__(self, data): super().__init__(data) - self._client_id = int(data.get('clid', '-1')) - self._target_channel_id = int(data.get('ctid', '-1')) - self._from_channel_id = int(data.get('cfid', '-1')) - self._reason_id = int(data.get('reasonid', '-1')) - self._reason_msg = data.get('reasonmsg', '') + self._client_id = int(data.get("clid", "-1")) + self._target_channel_id = int(data.get("ctid", "-1")) + self._from_channel_id = int(data.get("cfid", "-1")) + self._reason_id = int(data.get("reasonid", "-1")) + self._reason_msg = data.get("reasonmsg", "") @property def client_id(self): @@ -510,11 +536,12 @@ class ClientKickedEvent(ClientLeftEvent): """ Event created when a client is kicked. """ + def __init__(self, data): super().__init__(data) - self._invoker_id = int(data.get('invokerid', '-1')) - self._invoker_name = data.get('invokername', '') - self._invoker_uid = data.get('invokeruid', '') + self._invoker_id = int(data.get("invokerid", "-1")) + self._invoker_name = data.get("invokername", "") + self._invoker_uid = data.get("invokeruid", "") @property def invoker_id(self): @@ -533,9 +560,10 @@ class ClientBannedEvent(ClientKickedEvent): """ Event created when a client is banned. """ + def __init__(self, data): super().__init__(data) - self._ban_time = int(data.get("bantime", '-1')) + self._ban_time = int(data.get("bantime", "-1")) @property def ban_time(self): @@ -546,16 +574,17 @@ class ClientMovedEvent(TS3Event): """ Event created when a client is moved from/to a channel. """ + event_type = EventType.CLIENT_MOVED def __init__(self, data): super().__init__(data) - self._client_id = int(data.get('clid', '-1')) - self._target_channel_id = int(data.get('ctid', '-1')) - self._reason_id = int(data.get('reasonid', '-1')) - self._invoker_id = int(data.get('invokerid', '-1')) - self._invoker_name = data.get('invokername', '') - self._invoker_uid = data.get('invokeruid', '') + self._client_id = int(data.get("clid", "-1")) + self._target_channel_id = int(data.get("ctid", "-1")) + self._reason_id = int(data.get("reasonid", "-1")) + self._invoker_id = int(data.get("invokerid", "-1")) + self._invoker_name = data.get("invokername", "") + self._invoker_uid = data.get("invokeruid", "") @property def client_id(self): @@ -586,13 +615,14 @@ class ClientMovedSelfEvent(TS3Event): """ Event created when a client is moves themselves from/to a channel. """ + event_type = EventType.CLIENT_MOVED def __init__(self, data): super().__init__(data) - self._client_id = int(data.get('clid', '-1')) - self._target_channel_id = int(data.get('ctid', '-1')) - self._reason_id = int(data.get('reasonid', '-1')) + self._client_id = int(data.get("clid", "-1")) + self._target_channel_id = int(data.get("ctid", "-1")) + self._reason_id = int(data.get("reasonid", "-1")) @property def client_id(self): @@ -611,22 +641,23 @@ class TextMessageEvent(TS3Event): """ Event created when a Text Message is received. """ + event_type = EventType.TEXT_MESSAGE def __init__(self, data): super().__init__(data) - if data.get('targetmode') == '1': - self._targetmode = 'Private' - self._target = data.get('target') - elif data.get('targetmode') == '2': - self._targetmode = 'Channel' - elif data.get('targetmode') == '3': - self._targetmode = 'Server' - - self._message = data.get('msg') - self._invoker_id = int(data.get('invokerid', '-1')) - self._invoker_name = data.get('invokername', '') - self._invoker_uid = data.get('invokeruid', '-1') + if data.get("targetmode") == "1": + self._targetmode = "Private" + self._target = data.get("target") + elif data.get("targetmode") == "2": + self._targetmode = "Channel" + elif data.get("targetmode") == "3": + self._targetmode = "Server" + + self._message = data.get("msg") + self._invoker_id = int(data.get("invokerid", "-1")) + self._invoker_name = data.get("invokername", "") + self._invoker_uid = data.get("invokeruid", "-1") @property def invoker_id(self): @@ -650,6 +681,6 @@ def targetmode(self): @property def target(self): - if self.targetmode == 'Private': + if self.targetmode == "Private": return self._target return None diff --git a/README.md b/README.md index 3d6020c..04769d8 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,9 @@ [![CodeQL](https://github.com/Murgeye/teamspeak3-python-api/actions/workflows/codeql-analysis.yml/badge.svg)](https://github.com/Murgeye/teamspeak3-python-api/actions/workflows/codeql-analysis.yml) [![Pylint](https://github.com/Murgeye/teamspeak3-python-api/actions/workflows/pylint.yml/badge.svg)](https://github.com/Murgeye/teamspeak3-python-api/actions/workflows/pylint.yml) +[![CodeQL](https://github.com/Murgeye/teamspeak3-python-api/actions/workflows/codeql-analysis.yml/badge.svg?branch=master)](https://github.com/Murgeye/teamspeak3-python-api/actions/workflows/codeql-analysis.yml?query=branch%3Amaster) +[![Code Style](https://github.com/Murgeye/teamspeak3-python-api/actions/workflows/black-formatter.yml/badge.svg?branch=master)](https://github.com/Murgeye/teamspeak3-python-api/actions/workflows/black-formatter.yml?query=branch%3Amaster) + Python 3 API that allows interactive access to the Teamspeak 3 Server Query interface. # Installation diff --git a/SSHConnWrapper.py b/SSHConnWrapper.py index dbfc7ed..00a44c0 100644 --- a/SSHConnWrapper.py +++ b/SSHConnWrapper.py @@ -9,13 +9,25 @@ from .utilities import TS3Exception, TS3ConnectionClosedException + class SSHConnWrapper: """ SSH wrapper for TS3 connections. """ + # pylint: disable=too-many-arguments - def __init__(self, host, port, username, password, accept_all_keys=False, host_key_file=None, - use_system_hosts=False, timeout=10, timeout_limit=3): + def __init__( + self, + host, + port, + username, + password, + accept_all_keys=False, + host_key_file=None, + use_system_hosts=False, + timeout=10, + timeout_limit=3, + ): """ Create a new SSH connection wrapper. :param host: Hostname of the Server to connect to. @@ -40,7 +52,9 @@ def __init__(self, host, port, username, password, accept_all_keys=False, host_k if host_key_file is not None and isfile(host_key_file): self._ssh_conn.load_host_keys(host_key_file) if username is not None and password is not None: - self._ssh_conn.connect(host, port=port, username=username, password=password) + self._ssh_conn.connect( + host, port=port, username=username, password=password + ) if host_key_file is not None: self._ssh_conn.save_host_keys(host_key_file) self._channel = self._ssh_conn.invoke_shell("raw") @@ -64,16 +78,17 @@ def read_until(self, delimiter, timeout=None): except socket.timeout as exc: timeout_cnt += 1 if timeout_cnt >= self.timeout_limit: - raise TS3ConnectionClosedException("SSH connection timeout\ - limit received!") from exc + raise TS3ConnectionClosedException( + "SSH connection timeout limit received!" + ) from exc continue if len(received) == 0: raise TS3ConnectionClosedException("SSH connection was closed!") self._buffer += received else: break - data = self._buffer[:delimiter_pos + len(delimiter)] - self._buffer = self._buffer[delimiter_pos + len(delimiter):] + data = self._buffer[: delimiter_pos + len(delimiter)] + self._buffer = self._buffer[delimiter_pos + len(delimiter) :] return data def write(self, data): @@ -86,7 +101,7 @@ def write(self, data): raise TS3ConnectionClosedException(OSError) from exc def close(self): - """" + """ Close the underlying SSH connection. """ self._ssh_conn.close() diff --git a/TS3Connection.py b/TS3Connection.py index 24fb8ce..27956ff 100644 --- a/TS3Connection.py +++ b/TS3Connection.py @@ -26,9 +26,20 @@ class TS3Connection: """ # pylint: disable=too-many-arguments - def __init__(self, host="127.0.0.1", port=10011, log_file="api.log", use_ssh=False, - username=None, password=None, accept_all_keys=False, host_key_file=None, - use_system_hosts=False, sshtimeout=None, sshtimeoutlimit=3): + def __init__( + self, + host="127.0.0.1", + port=10011, + log_file="api.log", + use_ssh=False, + username=None, + password=None, + accept_all_keys=False, + host_key_file=None, + use_system_hosts=False, + sshtimeout=None, + sshtimeoutlimit=3, + ): """ Creates a new TS3Connection. :param host: Host to connect to. Can be an IP address or a hostname. @@ -49,11 +60,13 @@ def __init__(self, host="127.0.0.1", port=10011, log_file="api.log", use_ssh=Fal self._data_read.set() self._data = None # create console handler and set level to warning - file_handler = logging.FileHandler(log_file, mode='a+') + file_handler = logging.FileHandler(log_file, mode="a+") file_handler.setLevel(logging.WARNING) # create formatter - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) # add formatter to ch file_handler.setFormatter(formatter) @@ -67,11 +80,18 @@ def __init__(self, host="127.0.0.1", port=10011, log_file="api.log", use_ssh=Fal self._logger.debug(self._conn.read_until(b"\n\r")) else: from .SSHConnWrapper import SSHConnWrapper - self._conn = SSHConnWrapper(host, port, username, password, - accept_all_keys=accept_all_keys, - host_key_file=host_key_file, timeout=sshtimeout, - timeout_limit=sshtimeoutlimit, - use_system_hosts=use_system_hosts) + + self._conn = SSHConnWrapper( + host, + port, + username, + password, + accept_all_keys=accept_all_keys, + host_key_file=host_key_file, + timeout=sshtimeout, + timeout_limit=sshtimeoutlimit, + use_system_hosts=use_system_hosts, + ) self._logger.debug(self._conn.read_until(b"\n\r")) self._logger.debug(self._conn.read_until(b"\n\r")) threading.Thread(target=self._recv).start() @@ -133,7 +153,7 @@ def _send(self, command, args=None, wait_for_resp=True, log_keepalive=False): :type log_keepalive: bool """ query = command - saved_resp = b'' + saved_resp = b"" ack = False if args is None: args = [] @@ -146,7 +166,7 @@ def _send(self, command, args=None, wait_for_resp=True, log_keepalive=False): self._logger.debug("Trying to acquire lock") if self._conn_lock.acquire(): self._logger.debug("Lock acquired") - if not query == b'\n\r' or query == b'\n\r' and log_keepalive: + if not query == b"\n\r" or query == b"\n\r" and log_keepalive: self._logger.debug("Query: %s", str(query)) self._logger.debug("Writing to connection") self._conn.write(query) @@ -161,12 +181,19 @@ def _send(self, command, args=None, wait_for_resp=True, log_keepalive=False): self._new_data.clear() self._data_read.set() if resp is not None: - if resp[0] == b'error': + if resp[0] == b"error": ack = True - if resp[1] != b'id=0': + if resp[1] != b"id=0": raise TS3QueryException( - int(resp[1].decode(encoding='UTF-8').split("=", 1)[1]), - resp[2].decode(encoding='UTF-8').split("=", 1)[1]) + int( + resp[1] + .decode(encoding="UTF-8") + .split("=", 1)[1] + ), + resp[2] + .decode(encoding="UTF-8") + .split("=", 1)[1], + ) else: self._logger.debug("Resp: %s", str(resp)) saved_resp += resp @@ -199,7 +226,8 @@ def _recv(self): try: self._conn.close() self._logger.debug( - "Releasing lock for closed connection to unfreeze threads ...") + "Releasing lock for closed connection to unfreeze threads ..." + ) self._conn_lock.release() # We really want to ignore ALL exceptions here! # pylint: disable=bare-except @@ -212,11 +240,13 @@ def _recv(self): if isinstance(data, TS3Event): event = data if isinstance(event, Events.TextMessageEvent): - signal = blinker.signal(event.event_type.name + "_" + event.targetmode.lower()) + signal = blinker.signal( + event.event_type.name + "_" + event.targetmode.lower() + ) else: signal = blinker.signal(event.event_type.name) self._logger.debug("Sending signal") - threading.Thread(target=signal.send, kwargs={'event': event}).start() + threading.Thread(target=signal.send, kwargs={"event": event}).start() continue if data is not None: self._data_read.wait() @@ -233,10 +263,10 @@ def _parse_resp_to_dict(resp): :return: Dictionary containing all info extracted from the response. :rtype: dict[str, str] """ - resp = resp.decode(encoding='UTF-8').split(" ") + resp = resp.decode(encoding="UTF-8").split(" ") info = {} for part in resp: - split = part.split('=', 1) + split = part.split("=", 1) # TODO: Handle empty data? if len(split) == 2: key, value = split @@ -275,7 +305,9 @@ def register_for_server_messages(self, event_listener=None, weak_ref=True): self._send("servernotifyregister", ["event=textserver"]) if event_listener is not None: for event in Events.text_events: - blinker.signal(event.name + "_server").connect(event_listener, weak=weak_ref) + blinker.signal(event.name + "_server").connect( + event_listener, weak=weak_ref + ) def register_for_channel_messages(self, event_listener=None, weak_ref=True): """ @@ -291,7 +323,9 @@ def register_for_channel_messages(self, event_listener=None, weak_ref=True): self._send("servernotifyregister", ["event=textchannel"]) if event_listener is not None: for event in Events.text_events: - blinker.signal(event.name + "_channel").connect(event_listener, weak=weak_ref) + blinker.signal(event.name + "_channel").connect( + event_listener, weak=weak_ref + ) def register_for_private_messages(self, event_listener=None, weak_ref=True): """ @@ -308,7 +342,9 @@ def register_for_private_messages(self, event_listener=None, weak_ref=True): self._send("servernotifyregister", ["event=textprivate"]) if event_listener is not None: for event in Events.text_events: - blinker.signal(event.name + "_private").connect(event_listener, weak=weak_ref) + blinker.signal(event.name + "_private").connect( + event_listener, weak=weak_ref + ) def register_for_server_events(self, event_listener=None, weak_ref=True): """ @@ -325,7 +361,9 @@ def register_for_server_events(self, event_listener=None, weak_ref=True): for event in Events.server_events: blinker.signal(event.name).connect(event_listener, weak=weak_ref) - def register_for_channel_events(self, channel_id, event_listener=None, weak_ref=True): + def register_for_channel_events( + self, channel_id, event_listener=None, weak_ref=True + ): """ Register event_listener for receiving channel_events. :param event_listener: Blinker signal handler function to be informed: @@ -388,8 +426,14 @@ def clientkick(self, client_id, reason_id, reason_msg): :param reason_msg: Message to send on kick, max. 40 characters :type reason_msg: str """ - self._send("clientkick", ["clid=" + str(client_id), "reasonid=" + str(reason_id), - "reasonmsg=" + str(reason_msg)]) + self._send( + "clientkick", + [ + "clid=" + str(client_id), + "reasonid=" + str(reason_id), + "reasonmsg=" + str(reason_msg), + ], + ) def whoami(self): """ @@ -403,9 +447,9 @@ def whoami(self): def channellist(self, params=None): """ - Returns the channel listt. - :param params: Optional parameters as defined by the serverquery manual. - :return: List of channels + Returns the channel listt. + :param params: Optional parameters as defined by the serverquery manual. + :return: List of channels """ if params is None: params = [] @@ -420,9 +464,9 @@ def channellist(self, params=None): def channel_name_list(self): """ - Returns a liszt of channel names. (Convenience Wrapper around channellist) - :return: List of channel names - """ + Returns a list of channel names. (Convenience Wrapper around channellist) + :return: List of channel names + """ names = [] channels = self.channellist() for channel in channels: @@ -437,7 +481,8 @@ def channelfind(self, pattern): :rtype: list[dict[str, str]] """ return TS3Connection._parse_resp_to_list_of_dicts( - self._send("channelfind", ["pattern=" + pattern])) + self._send("channelfind", ["pattern=" + pattern]) + ) def channelfind_by_name(self, name): """ @@ -463,8 +508,14 @@ def sendtextmessage(self, targetmode, target, msg): :type target: int :type msg: str """ - self._send("sendtextmessage", - ["targetmode=" + str(targetmode), "target=" + str(target), "msg=" + str(msg)]) + self._send( + "sendtextmessage", + [ + "targetmode=" + str(targetmode), + "target=" + str(target), + "msg=" + str(msg), + ], + ) def servergrouplist(self): """ @@ -517,7 +568,9 @@ def clientinfo(self, client_id): :return: Dictionary of client information. :rtype: dict[str,str] """ - return self._parse_resp_to_dict(self._send("clientinfo", ["clid=" + str(client_id)])) + return self._parse_resp_to_dict( + self._send("clientinfo", ["clid=" + str(client_id)]) + ) def clientpoke(self, clid, msg): """ @@ -527,8 +580,9 @@ def clientpoke(self, clid, msg): :type clid: int :type msg: str """ - return self._parse_resp_to_dict(self._send("clientpoke", ["clid=" + str(clid), - "msg=" + str(msg)])) + return self._parse_resp_to_dict( + self._send("clientpoke", ["clid=" + str(clid), "msg=" + str(msg)]) + ) def _parse_resp(self, resp): """ @@ -542,18 +596,18 @@ def _parse_resp(self, resp): :rtype: None | dict[str, str] | bytes """ # Acknowledgements - if resp.startswith(b'error'): - resp = resp.split(b' ') + if resp.startswith(b"error"): + resp = resp.split(b" ") return resp # Events - if resp.startswith(b'notify'): + if resp.startswith(b"notify"): event = {} event_type = "Unknown" try: - resp = resp.decode(encoding='UTF-8').split(" ") + resp = resp.decode(encoding="UTF-8").split(" ") event_type = resp[0] for info in resp[1:]: - split = info.split('=', 1) + split = info.split("=", 1) if len(split) == 2: key, value = split event[key] = utilities.unescape(value) @@ -643,9 +697,11 @@ def wrapper(*args, **kwargs): :param kwargs: dict of labeled parameters within the function head :return: (List of) Dictionary response or nothing, depends on ts3server response """ - resp = self._send(item, - ['-{}'.format(x) for x in args] + ['{}={}'.format(x[0], x[1]) for x in - kwargs.items()]) + resp = self._send( + item, + ["-{}".format(x) for x in args] + + ["{}={}".format(x[0], x[1]) for x in kwargs.items()], + ) if resp: parsed_resp = self._parse_resp_to_list_of_dicts(resp) return parsed_resp[0] if len(parsed_resp) == 1 else parsed_resp @@ -669,7 +725,8 @@ def __init__(self, error_id, message): self._type = TS3QueryExceptionType(error_id) self._msg = utilities.unescape(message) super(TS3QueryException, self).__init__( - "Query failed with id=" + str(error_id) + " msg=" + str(self._msg)) + "Query failed with id=" + str(error_id) + " msg=" + str(self._msg) + ) @property def message(self): diff --git a/TS3QueryExceptionType.py b/TS3QueryExceptionType.py index fa21d8f..62c969f 100644 --- a/TS3QueryExceptionType.py +++ b/TS3QueryExceptionType.py @@ -9,6 +9,7 @@ class TS3QueryExceptionType(IntEnum): """ TS3 Query Exception based on TS3 error codes. """ + OK = 0 UNDEFINED = 1 NOT_IMPLEMENTED = 2 diff --git a/__init__.py b/__init__.py index 8134a86..a527bc5 100644 --- a/__init__.py +++ b/__init__.py @@ -3,5 +3,5 @@ """ -__author__ = 'Fabian Ising, Daniel Lukats' +__author__ = "Fabian Ising, Daniel Lukats" __version__ = "0.9.4" diff --git a/pyproject.toml b/pyproject.toml index 611fb7c..992f849 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,3 +14,7 @@ addopts = [ ignore-paths = "tests" fail-under = "9.0" suggestion-mode = 'yes' + +[tool.black] +target-version = ['py311'] +include = '\.py$' diff --git a/socket_wrapper.py b/socket_wrapper.py index d2e911b..7ced4df 100644 --- a/socket_wrapper.py +++ b/socket_wrapper.py @@ -5,10 +5,12 @@ from .utilities import TS3ConnectionClosedException + class SocketWrapper: """ Socket wrapper for TS3 connections. Provides some helper functions. """ + # pylint: disable=too-many-arguments def __init__(self, host, port, timeout=10, timeout_limit=3): """ @@ -41,16 +43,17 @@ def read_until(self, delimiter, timeout=None): except socket.timeout as exc: timeout_cnt += 1 if timeout_cnt >= self.timeout_limit: - raise TS3ConnectionClosedException("Socket connection timeout\ - limit received!") from exc + raise TS3ConnectionClosedException( + "Socket connection timeout limit received!" + ) from exc continue if len(received) == 0: raise TS3ConnectionClosedException("Socket connection was closed!") self._buffer += received else: break - data = self._buffer[:delimiter_pos + len(delimiter)] - self._buffer = self._buffer[delimiter_pos + len(delimiter):] + data = self._buffer[: delimiter_pos + len(delimiter)] + self._buffer = self._buffer[delimiter_pos + len(delimiter) :] if timeout is not None: self._conn.settimeout(self.timeout) return data @@ -65,7 +68,7 @@ def write(self, data): raise TS3ConnectionClosedException(OSError) from exc def close(self): - """" + """ Close the underlying connection. """ self._conn.close() diff --git a/tests/test_TS3Connection.py b/tests/test_TS3Connection.py index 262d98a..cd687eb 100644 --- a/tests/test_TS3Connection.py +++ b/tests/test_TS3Connection.py @@ -4,6 +4,7 @@ from ts3API.Events import * from ts3API.TS3Connection import TS3Connection + class MockTS3Connection(TS3Connection): def __init__(self): self._logger = logging.Logger(__name__, logging.DEBUG) @@ -15,27 +16,46 @@ class TestTS3Connection(TestCase): def setUp(self) -> None: self.conn = MockTS3Connection() + def test_parse_resp_left_event(self): + resp = b"notifyclientleftview cfid=1 ctid=0 reasonid=8 reasonmsg=Left. clid=1" + result = self.conn._parse_resp(resp) + self.assertIs(ClientLeftEvent, type(result), "ClientLeft not parsed correctly") + self.assertEqual(result.client_id, 1, "ClientLeft not parsed correctly") + def test_parse_resp_kicked_event(self): - resp = b"notifyclientleftview cfid=1 ctid=0 reasonid=" + str( - ReasonID.SERVER_KICK.value).encode("ascii") + b" reasonmsg=Kicked. clid=1" + resp = ( + b"notifyclientleftview cfid=1 ctid=0 reasonid=" + + str(ReasonID.SERVER_KICK.value).encode("ascii") + + b" reasonmsg=Kicked. clid=1" + ) result = self.conn._parse_resp(resp) - self.assertIs(ClientKickedEvent, type(result), "Client kick not parsed correctly") + self.assertIs( + ClientKickedEvent, type(result), "Client kick not parsed correctly" + ) self.assertEqual(1, result.client_id, "Client kick not parsed correctly") def test_parse_banned_event(self): - resp = b"notifyclientleftview cfid=1 ctid=0 reasonid=" + str(ReasonID.BAN.value).encode( - "ascii") + b" reasonmsg=Kicked. clid=1 bantime=10 invokerid=2 invokername=Test invokeruid=sdfsadf" + resp = ( + b"notifyclientleftview cfid=1 ctid=0 reasonid=" + + str(ReasonID.BAN.value).encode("ascii") + + b" reasonmsg=Kicked. clid=1 bantime=10 invokerid=2 invokername=Test invokeruid=sdfsadf" + ) result = self.conn._parse_resp(resp) - self.assertIs(ClientBannedEvent, type(result), "Client ban not parsed correctly") + self.assertIs( + ClientBannedEvent, type(result), "Client ban not parsed correctly" + ) self.assertEqual(1, result.client_id, "Client ban not parsed correctly") self.assertEqual(10, result.ban_time, "Client ban not parsed correctly") self.assertEqual(2, result.invoker_id, "Client ban not parsed correctly") self.assertEqual("Test", result.invoker_name, "Client ban not parsed correctly") - self.assertEqual("sdfsadf", result.invoker_uid, "Client ban not parsed correctly") + self.assertEqual( + "sdfsadf", result.invoker_uid, "Client ban not parsed correctly" + ) def test_parse_resp_left_event(self): - resp = b"notifyclientleftview cfid=1 ctid=0 reasonid=8 " \ - b"reasonmsg=Left. clid=1" + resp = ( + b"notifyclientleftview cfid=1 ctid=0 reasonid=8 " b"reasonmsg=Left. clid=1" + ) result = self.conn._parse_resp(resp) self.assertIs(ClientLeftEvent, type(result), "ClientLeft not parsed correctly") self.assertEqual(result.client_id, 1, "ClientLeft not parsed correctly") @@ -43,21 +63,32 @@ def test_parse_resp_left_event(self): def test_parse_resp_left_event_missing_reason_id(self): resp = b"notifyclientleftview reasonmsg=Left. clid=1" result = self.conn._parse_resp(resp) - self.assertIs(ClientLeftEvent, type(result), - "ClientLeft without reason id not parsed correctly") - self.assertEqual(1, result.client_id, "ClientLeft without reason id not parsed correctly") + self.assertIs( + ClientLeftEvent, + type(result), + "ClientLeft without reason id not parsed correctly", + ) + self.assertEqual( + 1, result.client_id, "ClientLeft without reason id not parsed correctly" + ) def test_parse_resp_left_event_empty(self): resp = b"notifyclientleftview" result = self.conn._parse_resp(resp) - self.assertIs(ClientLeftEvent, type(result), "Empty client left not parsed correctly") + self.assertIs( + ClientLeftEvent, type(result), "Empty client left not parsed correctly" + ) self.assertEqual(-1, result.client_id, "Empty ClientLeft not parsed correctly") def test_parse_server_edited_event(self): - resp = b"notifyserveredited reasonid=0 invokerid=1 invokername=test " \ - b"invokeruid=asdf virtualserver_name=new_name virtualserver_name_phonetic=neeew\\sname" + resp = ( + b"notifyserveredited reasonid=0 invokerid=1 invokername=test " + b"invokeruid=asdf virtualserver_name=new_name virtualserver_name_phonetic=neeew\\sname" + ) result = self.conn._parse_resp(resp) - self.assertIs(ServerEditedEvent, type(result), "ServerEdited Event not parsed correctly") + self.assertIs( + ServerEditedEvent, type(result), "ServerEdited Event not parsed correctly" + ) assert result.invoker_id == "1" assert result.invoker_uid == "asdf" assert result.invoker_name == "test" @@ -66,12 +97,15 @@ def test_parse_server_edited_event(self): assert len(p) == 2 assert p["virtualserver_name"] == "new_name" assert p["virtualserver_name_phonetic"] == "neeew name" - def test_parse_server_edited_event_empty(self): resp = b"notifyserveredited" result = self.conn._parse_resp(resp) - self.assertIs(ServerEditedEvent, type(result), "Empty ServerEdited Event not parsed correctly") + self.assertIs( + ServerEditedEvent, + type(result), + "Empty ServerEdited Event not parsed correctly", + ) assert result.invoker_id == "-1" assert result.invoker_uid == "-1" assert result.invoker_name == "" diff --git a/tests/test_utilities.py b/tests/test_utilities.py index 924a0a8..389668c 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -3,6 +3,7 @@ import sys from ts3API import utilities + def test_escape_all(): """ Simple test: just concatenate all values and check the escaping @@ -14,6 +15,7 @@ def test_escape_all(): expected += r assert utilities.escape(unescaped) == expected + def test_unescape_all(): """ Simple test: just concatenate all replacements and check the unescaping @@ -25,6 +27,7 @@ def test_unescape_all(): escaped += r assert utilities.unescape(escaped) == expected + def test_escape_order(): """ Check if the order of escaping is correct. @@ -33,6 +36,7 @@ def test_escape_order(): expected = "\\\\r\\n" assert utilities.escape(unescaped) == expected + def test_escape_whitespace(): """ Check whitespace escaping. diff --git a/utilities.py b/utilities.py index 34de431..8cc91b6 100644 --- a/utilities.py +++ b/utilities.py @@ -2,8 +2,19 @@ # FROM OLD API # Don't change the order in this map, otherwise it might break! -_ESCAPE_MAP = [("\\", r"\\"), ("/", r"\/"), (" ", r"\s"), ("|", r"\p"), ("\a", r"\a"), - ("\b", r"\b"), ("\f", r"\f"), ("\n", r"\n"), ("\r", r"\r"), ("\t", r"\t"), ("\v", r"\v")] +_ESCAPE_MAP = [ + ("\\", r"\\"), + ("/", r"\/"), + (" ", r"\s"), + ("|", r"\p"), + ("\a", r"\a"), + ("\b", r"\b"), + ("\f", r"\f"), + ("\n", r"\n"), + ("\r", r"\r"), + ("\t", r"\t"), + ("\v", r"\v"), +] def escape(raw):