diff --git a/awscrt/mqtt_request_response.py b/awscrt/mqtt_request_response.py new file mode 100644 index 000000000..1fd661b85 --- /dev/null +++ b/awscrt/mqtt_request_response.py @@ -0,0 +1,217 @@ +""" +MQTT Request Response module +""" + +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +from enum import IntEnum +from dataclasses import dataclass +from typing import Callable, Union +from awscrt import NativeResource, mqtt5, mqtt, exceptions +from concurrent.futures import Future +import _awscrt +import collections.abc + +class SubscriptionStatusEventType(IntEnum): + """ + The type of change to the state of a streaming operation subscription + """ + + SUBSCRIPTION_ESTABLISHED = 0 + """ + The streaming operation is successfully subscribed to its topic (filter) + """ + + SUBSCRIPTION_LOST = 1 + """ + The streaming operation has temporarily lost its subscription to its topic (filter) + """ + + SUBSCRIPTION_HALTED = 2 + """ + The streaming operation has entered a terminal state where it has given up trying to subscribe + to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy). + """ + + +@dataclass +class SubscriptionStatusEvent: + """ + An event that describes a change in subscription status for a streaming operation. + + Args: + type (SubscriptionStatusEventType): The type of status change represented by the event + error (Exception): Describes an underlying reason for the event. Only set for SubscriptionLost and SubscriptionHalted. + """ + type: SubscriptionStatusEventType = None + error: Exception = None + + +@dataclass +class IncomingPublishEvent: + """ + An event that describes an incoming message on a streaming operation. + + Args: + topic (str): MQTT Topic that the response was received on. + payload (Optional[bytes]): The payload of the incoming message. + """ + topic: str + payload: 'Optional[bytes]' = None + + +""" +Signature for a handler that listens to subscription status events. +""" +SubscriptionStatusListener = Callable[[SubscriptionStatusEvent], None] + +""" +Signature for a handler that listens to incoming publish events. +""" +IncomingPublishListener = Callable[[IncomingPublishEvent], None] + + +@dataclass +class Response: + """ + Encapsulates a response to an AWS IoT Core MQTT-based service request + + Args: + topic (str): MQTT Topic that the response was received on. + payload (Optional[bytes]): The payload of the response. + """ + topic: str + payload: 'Optional[bytes]' = None + + +@dataclass +class ResponsePath: + """ + A response path is a pair of values - MQTT topic and a JSON path - that describe how a response to + an MQTT-based request may arrive. For a given request type, there may be multiple response paths and each + one is associated with a separate JSON schema for the response body. + + Args: + topic (str): MQTT topic that a response may arrive on. + correlation_token_json_path (Optional[str]): JSON path for finding correlation tokens within payloads that arrive on this path's topic. + """ + topic: str + correlation_token_json_path: 'Optional[str]' = None + + def validate(self): + assert isinstance(self.topic, str) + assert isinstance(self.correlation_token_json_path, str) or self.correlation_token_json_path is None + +@dataclass +class RequestResponseOperationOptions: + """ + Configuration options for an MQTT-based request-response operation. + + Args: + subscription_topic_filters (Sequence[str]): Set of topic filters that should be subscribed to in order to cover all possible response paths. Sometimes using wildcards can cut down on the subscriptions needed; other times that isn't valid. + response_paths (Sequence[ResponsePath]): Set of all possible response paths associated with this request type. + publish_topic (str): Topic to publish the request to once response subscriptions have been established. + payload (bytes): Payload to publish to 'publishTopic' in order to initiate the request + correlation_token (Optional[str]): Correlation token embedded in the request that must be found in a response message. This can be null to support certain services which don't use correlation tokens. In that case, the client only allows one token-less request at a time. + """ + subscription_topic_filters: 'Sequence[str]' + response_paths: 'Sequence[ResponsePath]' + publish_topic: str + payload: bytes + correlation_token: 'Optional[str]' = None + + def validate(self): + assert isinstance(self.subscription_topic_filters, collections.abc.Sequence) + for topic_filter in self.subscription_topic_filters: + assert isinstance(topic_filter, str) + + assert isinstance(self.response_paths, collections.abc.Sequence) + for response_path in self.response_paths: + response_path.validate() + + assert isinstance(self.publish_topic, str) + assert isinstance(self.payload, bytes) + assert isinstance(self.correlation_token, str) or self.correlation_token is None + +@dataclass +class StreamingOperationOptions: + """ + Configuration options for an MQTT-based streaming operation. + + Args: + subscription_topic_filter (str): Topic filter that the streaming operation should listen on + """ + subscription_topic_filter: str + + +@dataclass +class RequestResponseClientOptions: + """ + MQTT-based request-response client configuration options + + Args: + max_request_response_subscriptions (int): Maximum number of subscriptions that the client will concurrently use for request-response operations + max_streaming_subscriptions (int): Maximum number of subscriptions that the client will concurrently use for streaming operations + operation_timeout_in_seconds (Optional[int]): Duration, in seconds, that a request-response operation will wait for completion before giving up + """ + max_request_response_subscriptions: int + max_streaming_subscriptions: int + operation_timeout_in_seconds: 'Optional[int]' = 60 + + def validate(self): + assert isinstance(self.max_request_response_subscriptions, int) + assert isinstance(self.max_streaming_subscriptions, int) + assert isinstance(self.operation_timeout_in_seconds, int) + + +class Client(NativeResource): + """ + MQTT-based request-response client tuned for AWS MQTT services. + + Supports streaming operations (listen to a stream of modeled events from an MQTT topic) and request-response + operations (performs the subscribes, publish, and incoming publish correlation and error checking needed to + perform simple request-response operations over MQTT). + + Args: + protocol_client (Union[mqtt5.Client, mqtt.Connection]): MQTT client to use as transport + client_options (ClientOptions): The ClientOptions dataclass to used to configure the new request response Client. + + """ + + def __init__(self, protocol_client: Union[mqtt5.Client, mqtt.Connection], client_options: RequestResponseClientOptions): + + assert isinstance(protocol_client, mqtt5.Client) or isinstance(protocol_client, mqtt.Connection) + assert isinstance(client_options, RequestResponseClientOptions) + #client_options.validate() + + super().__init__() + + if isinstance(protocol_client, mqtt5.Client): + self._binding = _awscrt.mqtt_request_response_client_new_from_5(protocol_client, client_options) + else: + self._binding = _awscrt.mqtt_request_response_client_new_from_311(protocol_client, client_options) + + + def make_request(self, options: RequestResponseOperationOptions): + #options.validate() + + future = Future() + + def on_request_complete(error_code, topic, payload): + if error_code != 0: + future.set_exception(exceptions.from_code(error_code)) + else: + response = Response(topic=topic, payload=payload) + future.set_result(response) + + _awscrt.mqtt_request_response_client_make_request(self._binding, + options.subscription_topic_filters, + options.response_paths, + options.publish_topic, + options.payload, + options.correlation_token, + on_request_complete) + + return future + diff --git a/crt/aws-c-auth b/crt/aws-c-auth index e0bd58d17..cd9d6afcd 160000 --- a/crt/aws-c-auth +++ b/crt/aws-c-auth @@ -1 +1 @@ -Subproject commit e0bd58d172cdc78d62eff5728437790d06fcce50 +Subproject commit cd9d6afcd42035d49bb2d0d3bef24b9faed57773 diff --git a/source/module.c b/source/module.c index 7a2b7a974..2a000caf5 100644 --- a/source/module.c +++ b/source/module.c @@ -14,6 +14,7 @@ #include "mqtt5_client.h" #include "mqtt_client.h" #include "mqtt_client_connection.h" +#include "mqtt_request_response.h" #include "s3.h" #include "websocket.h" @@ -125,6 +126,11 @@ uint32_t PyObject_GetAttrAsUint32(PyObject *o, const char *class_name, const cha return result; } + if (attr == Py_None) { + PyErr_Format(PyExc_AttributeError, "'%s.%s' required integral attribute is None", class_name, attr_name); + return result; + } + PyObject_GetAsOptionalUint32(attr, class_name, attr_name, &result); Py_DECREF(attr); @@ -742,6 +748,11 @@ static PyMethodDef s_module_methods[] = { AWS_PY_METHOD_DEF(mqtt5_client_get_stats, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_ws_handshake_transform_complete, METH_VARARGS), + /* MQTT Request Response Client */ + AWS_PY_METHOD_DEF(mqtt_request_response_client_new_from_5, METH_VARARGS), + AWS_PY_METHOD_DEF(mqtt_request_response_client_new_from_311, METH_VARARGS), + AWS_PY_METHOD_DEF(mqtt_request_response_client_make_request, METH_VARARGS), + /* Cryptographic primitives */ AWS_PY_METHOD_DEF(md5_new, METH_NOARGS), AWS_PY_METHOD_DEF(sha256_new, METH_NOARGS), diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index 243af6a0e..4293eb490 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -1888,7 +1888,9 @@ PyObject *aws_py_mqtt5_client_subscribe(PyObject *self, PyObject *args) { Py_INCREF(metadata->callback); struct aws_mqtt5_subscribe_completion_options subscribe_completion_options = { - .completion_callback = &s_on_subscribe_complete_fn, .completion_user_data = metadata}; + .completion_callback = &s_on_subscribe_complete_fn, + .completion_user_data = metadata, + }; if (aws_mqtt5_client_subscribe(client->native, &subscribe_view, &subscribe_completion_options)) { PyErr_SetAwsLastError(); diff --git a/source/mqtt_request_response.c b/source/mqtt_request_response.c new file mode 100644 index 000000000..f19ffdfe2 --- /dev/null +++ b/source/mqtt_request_response.c @@ -0,0 +1,486 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include "mqtt_request_response.h" + +#include "mqtt5_client.h" +#include "mqtt_client_connection.h" + +#include "aws/mqtt/request-response/request_response_client.h" + +static const char *s_capsule_name_mqtt_request_response_client = "aws_mqtt_request_response_client"; + +static const char *AWS_PYOBJECT_KEY_REQUEST_RESPONSE_CLIENT_OPTIONS = "RequestResponseClientOptions"; +static const char *AWS_PYOBJECT_KEY_MAX_REQUEST_RESPONSE_SUBSCRIPTIONS = "max_request_response_subscriptions"; +static const char *AWS_PYOBJECT_KEY_MAX_STREAMING_SUBSCRIPTIONS = "max_streaming_subscriptions"; +static const char *AWS_PYOBJECT_KEY_OPERATION_TIMEOUT_IN_SECONDS = "operation_timeout_in_seconds"; + +struct mqtt_request_response_client_binding { + struct aws_mqtt_request_response_client *native; +}; + +static void s_mqtt_request_response_python_client_destructor(PyObject *client_capsule) { + struct mqtt_request_response_client_binding *client_binding = + PyCapsule_GetPointer(client_capsule, s_capsule_name_mqtt_request_response_client); + assert(client_binding); + + client_binding->native = aws_mqtt_request_response_client_release(client_binding->native); + + aws_mem_release(aws_py_get_allocator(), client_binding); +} + +/* + * Returns success as true/false. If not successful, a python error will be set, so the caller does not need to check + */ +static bool s_init_mqtt_request_response_client_options( + struct aws_mqtt_request_response_client_options *client_options, + PyObject *client_options_py) { + AWS_ZERO_STRUCT(*client_options); + + uint32_t max_request_response_subscriptions = PyObject_GetAttrAsUint32( + client_options_py, + AWS_PYOBJECT_KEY_REQUEST_RESPONSE_CLIENT_OPTIONS, + AWS_PYOBJECT_KEY_MAX_REQUEST_RESPONSE_SUBSCRIPTIONS); + if (PyErr_Occurred()) { + PyErr_Format(PyErr_Occurred(), "Cannot convert max_request_response_subscriptions to a C uint32"); + return false; + } + + uint32_t max_streaming_subscriptions = PyObject_GetAttrAsUint32( + client_options_py, + AWS_PYOBJECT_KEY_REQUEST_RESPONSE_CLIENT_OPTIONS, + AWS_PYOBJECT_KEY_MAX_STREAMING_SUBSCRIPTIONS); + if (PyErr_Occurred()) { + PyErr_Format(PyErr_Occurred(), "Cannot convert max_streaming_subscriptions to a C uint32"); + return false; + } + + uint32_t timeout_in_seconds = PyObject_GetAttrAsUint32( + client_options_py, + AWS_PYOBJECT_KEY_REQUEST_RESPONSE_CLIENT_OPTIONS, + AWS_PYOBJECT_KEY_OPERATION_TIMEOUT_IN_SECONDS); + if (PyErr_Occurred()) { + PyErr_Format(PyErr_Occurred(), "Cannot convert operation_timeout_in_seconds to a C uint32_t"); + return false; + } + + client_options->max_request_response_subscriptions = (size_t)max_request_response_subscriptions; + client_options->max_streaming_subscriptions = (size_t)max_streaming_subscriptions; + client_options->operation_timeout_seconds = (uint32_t)timeout_in_seconds; + + return true; +} + +PyObject *aws_py_mqtt_request_response_client_new_from_5(PyObject *self, PyObject *args) { + (void)self; + + PyObject *mqtt5_client_py = NULL; + PyObject *client_options_py = NULL; + + if (!PyArg_ParseTuple( + args, + "OO", + /* O */ &mqtt5_client_py, + /* O */ &client_options_py)) { + return NULL; + } + + struct aws_mqtt5_client *protocol_client = aws_py_get_mqtt5_client(mqtt5_client_py); + if (protocol_client == NULL) { + return NULL; + } + + struct aws_mqtt_request_response_client_options client_options; + if (!s_init_mqtt_request_response_client_options(&client_options, client_options_py)) { + return NULL; + } + + struct aws_allocator *allocator = aws_py_get_allocator(); + + struct aws_mqtt_request_response_client *rr_client = + aws_mqtt_request_response_client_new_from_mqtt5_client(allocator, protocol_client, &client_options); + if (rr_client == NULL) { + PyErr_SetAwsLastError(); + return NULL; + } + + struct mqtt_request_response_client_binding *client_binding = + aws_mem_calloc(allocator, 1, sizeof(struct mqtt_request_response_client_binding)); + // Python object that wraps a c struct and a function to call when its reference goes to zero + PyObject *capsule = PyCapsule_New( + client_binding, s_capsule_name_mqtt_request_response_client, s_mqtt_request_response_python_client_destructor); + if (!capsule) { + aws_mem_release(allocator, client_binding); + aws_mqtt_request_response_client_release(rr_client); + return NULL; + } + + client_binding->native = rr_client; + + return capsule; +} + +PyObject *aws_py_mqtt_request_response_client_new_from_311(PyObject *self, PyObject *args) { + (void)self; + + PyObject *mqtt_connection_py = NULL; + PyObject *client_options_py = NULL; + + if (!PyArg_ParseTuple( + args, + "OO", + /* O */ &mqtt_connection_py, + /* O */ &client_options_py)) { + return NULL; + } + + struct aws_mqtt_client_connection *protocol_client = aws_py_get_mqtt_client_connection(mqtt_connection_py); + if (protocol_client == NULL) { + return NULL; + } + + struct aws_mqtt_request_response_client_options client_options; + if (!s_init_mqtt_request_response_client_options(&client_options, client_options_py)) { + return NULL; + } + + struct aws_allocator *allocator = aws_py_get_allocator(); + + struct aws_mqtt_request_response_client *rr_client = + aws_mqtt_request_response_client_new_from_mqtt311_client(allocator, protocol_client, &client_options); + if (rr_client == NULL) { + PyErr_SetAwsLastError(); + return NULL; + } + + struct mqtt_request_response_client_binding *client_binding = + aws_mem_calloc(allocator, 1, sizeof(struct mqtt_request_response_client_binding)); + // Python object that wraps a c struct and a function to call when its reference goes to zero + PyObject *capsule = PyCapsule_New( + client_binding, s_capsule_name_mqtt_request_response_client, s_mqtt_request_response_python_client_destructor); + if (!capsule) { + aws_mem_release(allocator, client_binding); + aws_mqtt_request_response_client_release(rr_client); + return NULL; + } + + client_binding->native = rr_client; + + return capsule; +} + +/*************************************************************************************************/ + +static const char *AWS_PYOBJECT_KEY_TOPIC = "topic"; +static const char *AWS_PYOBJECT_KEY_CORRELATION_TOKEN_JSON_PATH = "correlation_token_json_path"; + +static void s_cleanup_subscription_topic_filters(struct aws_array_list *subscription_topic_filters) { + size_t filter_count = aws_array_list_length(subscription_topic_filters); + for (size_t i = 0; i < filter_count; ++i) { + struct aws_byte_buf filter_buffer; + aws_array_list_get_at(subscription_topic_filters, &filter_buffer, i); + + aws_byte_buf_clean_up(&filter_buffer); + } + + aws_array_list_clean_up(subscription_topic_filters); +} + +static bool s_init_subscription_topic_filters( + struct aws_array_list *subscription_topic_filters, + PyObject *subscription_topic_filters_py) { + AWS_ZERO_STRUCT(*subscription_topic_filters); + + if (!PySequence_Check(subscription_topic_filters_py)) { + PyErr_Format(PyExc_TypeError, "subscription_topic_filters must be a sequence type"); + return false; + } + + Py_ssize_t filter_count = PySequence_Size(subscription_topic_filters_py); + if (filter_count <= 0) { + PyErr_Format(PyExc_TypeError, "subscription_topic_filters must have at least one element"); + return false; + } + + bool success = false; + struct aws_allocator *allocator = aws_py_get_allocator(); + aws_array_list_init_dynamic( + subscription_topic_filters, allocator, (size_t)filter_count, sizeof(struct aws_byte_buf)); + + for (size_t i = 0; i < (size_t)filter_count; ++i) { + PyObject *entry_py = PySequence_GetItem(subscription_topic_filters_py, i); + if (entry_py == NULL) { + goto done; + } + + struct aws_byte_cursor topic_filter_cursor = aws_byte_cursor_from_pyunicode(entry_py); + + struct aws_byte_buf topic_filter; + aws_byte_buf_init_copy_from_cursor(&topic_filter, allocator, topic_filter_cursor); + + aws_array_list_push_back(subscription_topic_filters, &topic_filter); + + Py_XDECREF(entry_py); + + if (PyErr_Occurred()) { + goto done; + } + } + + success = true; + +done: + + if (!success) { + s_cleanup_subscription_topic_filters(subscription_topic_filters); + AWS_ZERO_STRUCT(*subscription_topic_filters); + } + + return success; +} + +struct aws_request_response_path { + struct aws_byte_buf topic; + struct aws_byte_buf correlation_token_json_path; +}; + +static void s_cleanup_response_paths(struct aws_array_list *response_paths) { + size_t path_count = aws_array_list_length(response_paths); + for (size_t i = 0; i < path_count; ++i) { + struct aws_request_response_path response_path; + aws_array_list_get_at(response_paths, &response_path, i); + + aws_byte_buf_clean_up(&response_path.topic); + aws_byte_buf_clean_up(&response_path.correlation_token_json_path); + } + + aws_array_list_clean_up(response_paths); +} + +static bool s_init_response_paths(struct aws_array_list *response_paths, PyObject *response_paths_py) { + AWS_ZERO_STRUCT(*response_paths); + + if (!PySequence_Check(response_paths_py)) { + PyErr_Format(PyExc_TypeError, "response_paths must be a sequence type"); + return false; + } + + Py_ssize_t path_count = PySequence_Size(response_paths_py); + if (path_count <= 0) { + PyErr_Format(PyExc_TypeError, "response_paths must have at least one element"); + return false; + } + + bool success = false; + struct aws_allocator *allocator = aws_py_get_allocator(); + aws_array_list_init_dynamic( + response_paths, allocator, (size_t)path_count, sizeof(struct aws_request_response_path)); + + for (size_t i = 0; i < (size_t)path_count; ++i) { + PyObject *entry_py = PySequence_GetItem(response_paths_py, i); + if (entry_py == NULL) { + goto done; + } + + PyObject *topic_py = PyObject_GetAttrString(entry_py, AWS_PYOBJECT_KEY_TOPIC); + PyObject *correlation_token_json_path_py = PyObject_GetAttrString(entry_py, AWS_PYOBJECT_KEY_CORRELATION_TOKEN_JSON_PATH); + if (topic_py != NULL && correlation_token_json_path_py != NULL) { + struct aws_byte_cursor topic_cursor = aws_byte_cursor_from_pyunicode(topic_py); + struct aws_byte_cursor correlation_token_json_path_cursor; + AWS_ZERO_STRUCT(correlation_token_json_path_cursor); + if (correlation_token_json_path_py != Py_None) { + correlation_token_json_path_cursor = aws_byte_cursor_from_pyunicode(correlation_token_json_path_py); + } + + struct aws_request_response_path response_path; + aws_byte_buf_init_copy_from_cursor(&response_path.topic, allocator, topic_cursor); + aws_byte_buf_init_copy_from_cursor( + &response_path.correlation_token_json_path, allocator, correlation_token_json_path_cursor); + + aws_array_list_push_back(response_paths, &response_path); + } + + Py_XDECREF(topic_py); + Py_XDECREF(correlation_token_json_path_py); + Py_XDECREF(entry_py); + + if (PyErr_Occurred()) { + PyErr_Format(PyExc_TypeError, "invalid response path"); + goto done; + } + } + + success = true; + +done: + + if (!success) { + s_cleanup_response_paths(response_paths); + AWS_ZERO_STRUCT(*response_paths); + } + + return success; +} + +struct aws_mqtt_make_request_binding { + PyObject *on_request_complete_callback; +}; + +static struct aws_mqtt_make_request_binding *s_aws_mqtt_make_request_binding_new( + PyObject *on_request_complete_callable_py) { + struct aws_mqtt_make_request_binding *binding = + aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct aws_mqtt_make_request_binding)); + + binding->on_request_complete_callback = on_request_complete_callable_py; + Py_XINCREF(binding->on_request_complete_callback); + + return binding; +} + +static void s_aws_mqtt_make_request_binding_destroy(struct aws_mqtt_make_request_binding *binding) { + if (binding == NULL) { + return; + } + + Py_XDECREF(binding->on_request_complete_callback); + + aws_mem_release(aws_py_get_allocator(), binding); +} + +static void s_on_mqtt_request_complete( + const struct aws_byte_cursor *response_topic, + const struct aws_byte_cursor *payload, + int error_code, + void *user_data) { + + struct aws_mqtt_make_request_binding *request_binding = user_data; + + PyGILState_STATE state; + if (aws_py_gilstate_ensure(&state)) { + return; + } + + PyObject *result = PyObject_CallFunction( + request_binding->on_request_complete_callback, + "(is#y#)", + /* i */ error_code, + /* s */ response_topic ? response_topic->ptr : NULL, + /* # */ response_topic ? response_topic->len : 0, + /* y */ payload ? payload->ptr : NULL, + /* # */ payload ? payload->len : 0); + if (!result) { + PyErr_WriteUnraisable(PyErr_Occurred()); + } + + Py_XDECREF(result); + + s_aws_mqtt_make_request_binding_destroy(request_binding); + + PyGILState_Release(state); +} + +PyObject *aws_py_mqtt_request_response_client_make_request(PyObject *self, PyObject *args) { + (void)self; + + PyObject *client_capsule_py; + PyObject *subscription_topic_filters_py; + PyObject *response_paths_py; + struct aws_byte_cursor publish_topic; + struct aws_byte_cursor payload; + struct aws_byte_cursor correlation_token; + PyObject *on_request_complete_callable_py; + + if (!PyArg_ParseTuple( + args, + "OOOs#y#z#O", + /* O */ &client_capsule_py, + /* O */ &subscription_topic_filters_py, + /* O */ &response_paths_py, + /* s */ &publish_topic.ptr, + /* # */ &publish_topic.len, + /* y */ &payload.ptr, + /* # */ &payload.len, + /* z */ &correlation_token.ptr, + /* # */ &correlation_token.len, + /* O */ &on_request_complete_callable_py)) { + return NULL; + } + + struct mqtt_request_response_client_binding *client_binding = + PyCapsule_GetPointer(client_capsule_py, s_capsule_name_mqtt_request_response_client); + if (!client_binding) { + return NULL; + } + + PyObject *result = NULL; + + struct aws_array_list subscription_topic_filters; // array_list + AWS_ZERO_STRUCT(subscription_topic_filters); + + struct aws_array_list response_paths; // array_list + AWS_ZERO_STRUCT(response_paths); + + if (!s_init_subscription_topic_filters(&subscription_topic_filters, subscription_topic_filters_py) || + !s_init_response_paths(&response_paths, response_paths_py)) { + goto done; + } + + { + result = Py_None; + size_t subscription_count = aws_array_list_length(&subscription_topic_filters); + struct aws_byte_cursor subscription_topic_filter_cursors[subscription_count]; + for (size_t i = 0; i < subscription_count; ++i) { + struct aws_byte_buf topic_filter; + aws_array_list_get_at(&subscription_topic_filters, &topic_filter, i); + + subscription_topic_filter_cursors[i] = aws_byte_cursor_from_buf(&topic_filter); + } + + size_t response_path_count = aws_array_list_length(&response_paths); + struct aws_mqtt_request_operation_response_path response_path_values[response_path_count]; + for (size_t i = 0; i < response_path_count; ++i) { + struct aws_request_response_path response_path; + aws_array_list_get_at(&response_paths, &response_path, i); + + response_path_values[i].topic = aws_byte_cursor_from_buf(&response_path.topic); + response_path_values[i].correlation_token_json_path = + aws_byte_cursor_from_buf(&response_path.correlation_token_json_path); + } + + struct aws_mqtt_make_request_binding *request_binding = + s_aws_mqtt_make_request_binding_new(on_request_complete_callable_py); + + struct aws_mqtt_request_operation_options request_options = { + .subscription_topic_filters = subscription_topic_filter_cursors, + .subscription_topic_filter_count = subscription_count, + .response_paths = response_path_values, + .response_path_count = response_path_count, + .publish_topic = publish_topic, + .serialized_request = payload, + .correlation_token = correlation_token, + .completion_callback = s_on_mqtt_request_complete, + .user_data = request_binding, + }; + + if (aws_mqtt_request_response_client_submit_request(client_binding->native, &request_options)) { + s_on_mqtt_request_complete(NULL, NULL, aws_last_error(), request_binding); + } + } + +done: + + s_cleanup_subscription_topic_filters(&subscription_topic_filters); + s_cleanup_response_paths(&response_paths); + + return result; +} + +struct aws_mqtt_request_response_client *aws_py_get_mqtt_request_response_client( + PyObject *mqtt_request_response_client) { + AWS_PY_RETURN_NATIVE_FROM_BINDING( + mqtt_request_response_client, + s_capsule_name_mqtt_request_response_client, + "Client", + mqtt_request_response_client_binding); +} \ No newline at end of file diff --git a/source/mqtt_request_response.h b/source/mqtt_request_response.h new file mode 100644 index 000000000..3d1c1c38d --- /dev/null +++ b/source/mqtt_request_response.h @@ -0,0 +1,21 @@ +#ifndef AWS_CRT_PYTHON_MQTT_REQUEST_RESPONSE_H +#define AWS_CRT_PYTHON_MQTT_REQUEST_RESPONSE_H +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include "module.h" + +struct aws_mqtt_request_response_client; + +PyObject *aws_py_mqtt_request_response_client_new_from_5(PyObject *self, PyObject *args); +PyObject *aws_py_mqtt_request_response_client_new_from_311(PyObject *self, PyObject *args); +PyObject *aws_py_mqtt_request_response_client_make_request(PyObject *self, PyObject *args); + +/* Given a python object, return a pointer to its underlying native type. + * If NULL is returned, a python error has been set */ +struct aws_mqtt_request_response_client *aws_py_get_mqtt_request_response_client( + PyObject *mqtt_request_response_client); + +#endif /* AWS_CRT_PYTHON_MQTT_REQUEST_RESPONSE_H */ \ No newline at end of file diff --git a/test/test_mqtt_request_response.py b/test/test_mqtt_request_response.py new file mode 100644 index 000000000..5afbe99bb --- /dev/null +++ b/test/test_mqtt_request_response.py @@ -0,0 +1,549 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +from test import NativeResourceTest +from awscrt import io, mqtt5, mqtt_request_response, mqtt + +from concurrent.futures import Future +import os +import time +import unittest +import uuid + +TIMEOUT = 30.0 + +def create_client_id(): + return f"aws-crt-python-unit-test-{uuid.uuid4()}" + +def _get_env_variable(env_name): + env_data = os.environ.get(env_name) + if not env_data: + raise unittest.SkipTest(f"test requires env var: {env_name}") + return env_data + +class MqttRequestResponse5TestCallbacks(): + def __init__(self): + self.future_connection_success = Future() + self.future_stopped = Future() + + def ws_handshake_transform(self, transform_args): + transform_args.set_done() + + def on_publish_received(self, publish_received_data: mqtt5.PublishReceivedData): + pass + + def on_lifecycle_stopped(self, lifecycle_stopped: mqtt5.LifecycleStoppedData): + if self.future_stopped: + self.future_stopped.set_result(None) + + def on_lifecycle_attempting_connect(self, lifecycle_attempting_connect: mqtt5.LifecycleAttemptingConnectData): + pass + + def on_lifecycle_connection_success(self, lifecycle_connection_success: mqtt5.LifecycleConnectSuccessData): + if self.future_connection_success: + self.future_connection_success.set_result(lifecycle_connection_success) + + def on_lifecycle_connection_failure(self, lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData): + if self.future_connection_success: + if self.future_connection_success.done(): + pass + else: + self.future_connection_success.set_exception(lifecycle_connection_failure.exception) + + def on_lifecycle_disconnection(self, lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData): + pass + +def _empty_response_paths(options): + options.response_paths = [] + +def _invalidate_response_path_topic(options): + options.response_paths[0].topic = "a/#/b" + +def _none_response_path_topic(options): + options.response_paths[0].topic = None + +def _missing_response_path_topic(options): + del options.response_paths[0].topic + +def _type_mismatch_response_path_topic(options): + options.response_paths[0].topic = 57.3 + +def _type_mismatch_response_path_correlation_token_json_path(options): + options.response_paths[0].correlation_token_json_path = [] + +def _type_mismatch_response_paths(options): + options.response_paths = "hello" + +def _invalidate_subscription_topic_filter(options): + options.subscription_topic_filters[0] = "a/#/c" + +def _type_mismatch_subscription_topic_filter(options): + options.subscription_topic_filters[0] = [ "thirty", 30 ] + +def _type_mismatch_subscriptions(options): + options.subscription_topic_filters = 50 + +def _empty_subscription_topic_filters(options): + options.subscription_topic_filters = [] + +def _none_publish_topic(options): + options.publish_topic = None + +def _bad_publish_topic(options): + options.publish_topic = "#/b/c" + +def _type_mismatch_publish_topic(options): + options.publish_topic = [["oof"]] + +def _type_mismatch_correlation_token(options): + options.correlation_token = [-1] + +class MqttRequestResponseClientTest(NativeResourceTest): + + def _create_client5(self): + + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883 + ) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + client_options.connect_options = mqtt5.ConnectPacket() + client_options.connect_options.client_id = create_client_id() + + callbacks = MqttRequestResponse5TestCallbacks() + client_options.on_lifecycle_event_stopped_fn = callbacks.on_lifecycle_stopped + client_options.on_lifecycle_event_connection_success_fn = callbacks.on_lifecycle_connection_success + client_options.on_lifecycle_event_connection_failure_fn = callbacks.on_lifecycle_connection_failure + client_options.on_lifecycle_event_stopped_fn = callbacks.on_lifecycle_stopped + + protocol_client = mqtt5.Client(client_options) + protocol_client.start() + + callbacks.future_connection_success.result(TIMEOUT) + + return protocol_client, callbacks + + def _shutdown5(self, protocol_client, callbacks): + + protocol_client.stop() + callbacks.future_stopped.result(TIMEOUT) + + + def _create_client311(self): + + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + tls_ctx = io.ClientTlsContext(tls_ctx_options) + + client = mqtt.Client(None, tls_ctx) + + protocol_client = mqtt.Connection( + client=client, + client_id=create_client_id(), + host_name=input_host_name, + port=8883, + ping_timeout_ms=10000, + keep_alive_secs=30 + ) + protocol_client.connect().result(TIMEOUT) + + return protocol_client + + def _shutdown311(self, protocol_client): + protocol_client.disconnect().result(TIMEOUT) + + def _create_rr_client(self, protocol_client, max_request_response_subscriptions, max_streaming_subscriptions, operation_timeout_seconds): + rr_client_options = mqtt_request_response.RequestResponseClientOptions(max_request_response_subscriptions, max_streaming_subscriptions) + rr_client_options.operation_timeout_in_seconds = operation_timeout_seconds + + rr_client = mqtt_request_response.Client(protocol_client, rr_client_options) + + return rr_client + + def _do_mqtt5_test(self, test_callable): + (protocol_client, callbacks) = self._create_client5() + + test_callable(protocol_client) + + self._shutdown5(protocol_client, callbacks) + + def _do_mqtt311_test(self, test_callable): + protocol_client = self._create_client311() + + test_callable(protocol_client) + + self._shutdown311(protocol_client) + + def _create_get_shadow_request(self, thing_name, use_correlation_token): + topic_prefix = f"$aws/things/{thing_name}/shadow/get" + + request_options = mqtt_request_response.RequestResponseOperationOptions( + subscription_topic_filters=[f"{topic_prefix}/+"], + response_paths=[ + mqtt_request_response.ResponsePath(topic=f"{topic_prefix}/accepted"), + mqtt_request_response.ResponsePath(topic=f"{topic_prefix}/rejected") + ], + publish_topic = topic_prefix, + payload="{}".encode() + ) + + if use_correlation_token: + correlation_token = f"{uuid.uuid4()}" + request_options.response_paths[0].correlation_token_json_path = "clientToken" + request_options.response_paths[1].correlation_token_json_path = "clientToken" + request_options.payload = f'{{"clientToken":"{correlation_token}"}}'.encode() + request_options.correlation_token = correlation_token + + return request_options + + def _do_get_shadow_success_no_such_shadow(self, rr_client, thing_name, use_correlation_token): + request_options = self._create_get_shadow_request(thing_name, use_correlation_token) + + request_future = rr_client.make_request(request_options) + response = request_future.result() + + self.assertEqual(request_options.response_paths[1].topic, response.topic, + "Expected response to come in on rejected topic") + payload = str(response.payload) + + self.assertIn("No shadow exists with name", payload) + + def _do_get_shadow_success_no_such_shadow_test(self, protocol_client, use_correlation_token): + rr_client = self._create_rr_client(protocol_client, 2, 2, 30) + + thing_name = f"tn-{uuid.uuid4()}" + self._do_get_shadow_success_no_such_shadow(rr_client, thing_name, use_correlation_token) + + + def _do_get_shadow_success(self, rr_client, thing_name, use_correlation_token): + request_options = self._create_get_shadow_request(thing_name, use_correlation_token) + + request_future = rr_client.make_request(request_options) + response = request_future.result() + + self.assertEqual(request_options.response_paths[0].topic, response.topic, + "Expected response to come in on accepted topic") + + payload = str(response.payload) + self.assertIn("magic", payload) + + def _do_update_shadow_success(self, rr_client, thing_name, use_correlation_token): + topic_prefix = f"$aws/things/{thing_name}/shadow/update" + + request_options = mqtt_request_response.RequestResponseOperationOptions( + subscription_topic_filters=[f"{topic_prefix}/accepted", f"{topic_prefix}/rejected"], + response_paths=[ + mqtt_request_response.ResponsePath(topic=f"{topic_prefix}/accepted"), + mqtt_request_response.ResponsePath(topic=f"{topic_prefix}/rejected") + ], + publish_topic = topic_prefix, + payload="".encode() + ) + + desired_state = f'{{"magic":"value"}}' + + if use_correlation_token: + correlation_token = f"{uuid.uuid4()}" + request_options.response_paths[0].correlation_token_json_path = "clientToken" + request_options.response_paths[1].correlation_token_json_path = "clientToken" + request_options.payload = f'{{"clientToken":"{correlation_token}","state":{{"desired":{desired_state}}}}}'.encode() + request_options.correlation_token = correlation_token + else: + request_options.payload = f'{{"state":{{"desired":{desired_state}}}}}'.encode() + + request_future = rr_client.make_request(request_options) + response = request_future.result() + + self.assertEqual(request_options.response_paths[0].topic, response.topic, + "Expected response to come in on accepted topic") + + payload = str(response.payload) + self.assertIn("magic", payload) + + def _do_delete_shadow_success(self, rr_client, thing_name, use_correlation_token): + topic_prefix = f"$aws/things/{thing_name}/shadow/delete" + + request_options = mqtt_request_response.RequestResponseOperationOptions( + subscription_topic_filters=[f"{topic_prefix}/+"], + response_paths=[ + mqtt_request_response.ResponsePath(topic=f"{topic_prefix}/accepted"), + mqtt_request_response.ResponsePath(topic=f"{topic_prefix}/rejected") + ], + publish_topic = topic_prefix, + payload="{}".encode() + ) + + if use_correlation_token: + correlation_token = f"{uuid.uuid4()}" + request_options.response_paths[0].correlation_token_json_path = "clientToken" + request_options.response_paths[1].correlation_token_json_path = "clientToken" + request_options.payload = f'{{"clientToken":"{correlation_token}"}}'.encode() + request_options.correlation_token = correlation_token + + request_future = rr_client.make_request(request_options) + response = request_future.result() + + self.assertEqual(request_options.response_paths[0].topic, response.topic, + "Expected response to come in on accepted topic") + + payload = str(response.payload) + self.assertIn("version", payload) + + def _do_update_delete_shadow_success_test(self, protocol_client, use_correlation_token): + rr_client = self._create_rr_client(protocol_client, 2, 2, 30) + + # get should return non-existence + thing_name = f"tn-{uuid.uuid4()}" + + try: + self._do_get_shadow_success_no_such_shadow(rr_client, thing_name, use_correlation_token) + + # update shadow to create + self._do_update_shadow_success(rr_client, thing_name, use_correlation_token) + + # eventual consistency worries + time.sleep(2) + + # get should now return the shadow state + self._do_get_shadow_success(rr_client, thing_name, use_correlation_token) + finally: + # delete shadow + self._do_delete_shadow_success(rr_client, thing_name, use_correlation_token) + + + def _do_get_shadow_failure_test(self, protocol_client, options_transform): + rr_client = self._create_rr_client(protocol_client, 2, 2, 30) + + thing_name = f"tn-{uuid.uuid4()}" + request_options = self._create_get_shadow_request(thing_name, True) + options_transform(request_options) + + self.assertRaises(Exception, lambda: rr_client.make_request(request_options)) + + def _do_get_shadow_future_failure_test(self, protocol_client, options_transform): + rr_client = self._create_rr_client(protocol_client, 2, 2, 30) + + thing_name = f"tn-{uuid.uuid4()}" + request_options = self._create_get_shadow_request(thing_name, True) + options_transform(request_options) + + response_future = rr_client.make_request(request_options) + + self.assertRaises(Exception, lambda: response_future.result()) + + # ============================================================== + # CREATION SUCCESS TEST CASES + # ============================================================== + + def test_client_creation_success5(self): + self._do_mqtt5_test(lambda protocol_client: self._create_rr_client(protocol_client, 2, 2, 30)) + + def test_client_creation_success311(self): + self._do_mqtt311_test(lambda protocol_client: self._create_rr_client(protocol_client, 2, 2, 30)) + + # ============================================================== + # CREATION FAILURE TEST CASES + # ============================================================== + + def test_client_creation_failure_no_protocol_client(self): + self.assertRaises(Exception, self._create_rr_client, None, 2, 2, 30) + + def test_client_creation_failure_zero_request_response_subscriptions5(self): + self._do_mqtt5_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 0, 2, 30)) + + def test_client_creation_failure_zero_request_response_subscriptions311(self): + self._do_mqtt311_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 0, 2, 30)) + + def test_client_creation_failure_negative_request_response_subscriptions5(self): + self._do_mqtt5_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, -2, 2, 30)) + + def test_client_creation_failure_negative_request_response_subscriptions311(self): + self._do_mqtt311_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, -2, 2, 30)) + + def test_client_creation_failure_no_request_response_subscriptions5(self): + self._do_mqtt5_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, None, 2, 30)) + + def test_client_creation_failure_no_request_response_subscriptions311(self): + self._do_mqtt311_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, None, 2, 30)) + + def test_client_creation_failure_request_response_subscriptions_type_mismatch5(self): + self._do_mqtt5_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, "None", 2, 30)) + + def test_client_creation_failure_request_response_subscriptions_type_mismatch311(self): + self._do_mqtt311_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, "None", 2, 30)) + + def test_client_creation_failure_negative_streaming_subscriptions5(self): + self._do_mqtt5_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, -2, 30)) + + def test_client_creation_failure_negative_streaming_subscriptions311(self): + self._do_mqtt311_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, -2, 30)) + + def test_client_creation_failure_no_streaming_subscriptions5(self): + self._do_mqtt5_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, None, 30)) + + def test_client_creation_failure_no_streaming_subscriptions311(self): + self._do_mqtt311_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, None, 30)) + + def test_client_creation_failure_streaming_subscriptions_type_mismatch5(self): + self._do_mqtt5_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, [], 30)) + + def test_client_creation_failure_streaming_subscriptions_type_mismatch311(self): + self._do_mqtt311_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, {}, 30)) + + def test_client_creation_failure_negative_operation_timeout5(self): + self._do_mqtt5_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, 2, -30)) + + def test_client_creation_failure_negative_operation_timeout311(self): + self._do_mqtt311_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, 2, -30)) + + def test_client_creation_failure_no_operation_timeout5(self): + self._do_mqtt5_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, 2, None)) + + def test_client_creation_failure_no_operation_timeout311(self): + self._do_mqtt311_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, 2, None)) + def test_client_creation_failure_operation_timeout_invalid5(self): + self._do_mqtt5_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, 2, 523.56)) + + def test_client_creation_failure_operation_timeout_invalid311(self): + self._do_mqtt311_test(lambda protocol_client: self.assertRaises(Exception, self._create_rr_client, protocol_client, 2, 2, 777777777777777777777777777777777777)) + + # ============================================================== + # make_request SUCCESS TEST CASES + # ============================================================== + + def test_get_shadow_success_no_such_shadow5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_success_no_such_shadow_test(protocol_client, True)) + + def test_get_shadow_success_no_such_shadow311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_success_no_such_shadow_test(protocol_client, True)) + + def test_get_shadow_success_no_such_shadow_no_correlation_token5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_success_no_such_shadow_test(protocol_client, False)) + + def test_get_shadow_success_no_such_shadow_no_correlation_token311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_success_no_such_shadow_test(protocol_client, False)) + + def test_update_delete_shadow_success5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_update_delete_shadow_success_test(protocol_client, True)) + + def test_update_delete_shadow_success311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_update_delete_shadow_success_test(protocol_client, True)) + + def test_update_delete_shadow_success_no_correlation_token5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_update_delete_shadow_success_test(protocol_client, False)) + + def test_update_delete_shadow_success_no_correlation_token311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_update_delete_shadow_success_test(protocol_client, False)) + + # ============================================================== + # make_request FAILURE TEST CASES + # ============================================================== + + def test_get_shadow_failure_no_response_paths5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _empty_response_paths(options))) + + def test_get_shadow_failure_no_response_paths311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _empty_response_paths(options))) + + def test_get_shadow_failure_invalid_response_path_topic5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_future_failure_test(protocol_client, lambda options: _invalidate_response_path_topic(options))) + + def test_get_shadow_failure_invalid_response_path_topic311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_future_failure_test(protocol_client, lambda options: _invalidate_response_path_topic(options))) + + def test_get_shadow_failure_none_response_path_topic5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _none_response_path_topic(options))) + + def test_get_shadow_failure_none_response_path_topic311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _none_response_path_topic(options))) + + def test_get_shadow_failure_missing_response_path_topic5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _missing_response_path_topic(options))) + + def test_get_shadow_failure_missing_response_path_topic311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _missing_response_path_topic(options))) + + def test_get_shadow_failure_response_path_topic_type_mismatch5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_response_path_topic(options))) + + def test_get_shadow_failure_response_path_topic_type_mismatch311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_response_path_topic(options))) + + def test_get_shadow_failure_response_path_correlation_token_json_path_type_mismatch5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_response_path_correlation_token_json_path(options))) + + def test_get_shadow_failure_response_path_correlation_token_json_path_type_mismatch311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_response_path_correlation_token_json_path(options))) + + def test_get_shadow_failure_response_paths_type_mismatch5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_response_paths(options))) + + def test_get_shadow_failure_response_paths_type_mismatch311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_response_paths(options))) + + def test_get_shadow_failure_invalid_subscription_topic5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_future_failure_test(protocol_client, lambda options: _invalidate_subscription_topic_filter(options))) + + def test_get_shadow_failure_invalid_subscription_topic311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_future_failure_test(protocol_client, lambda options: _invalidate_subscription_topic_filter(options))) + + def test_get_shadow_failure_subscription_topic_type_mismatch5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_subscription_topic_filter(options))) + + def test_get_shadow_failure_subscription_topic_type_mismatch311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_subscription_topic_filter(options))) + + def test_get_shadow_failure_subscriptions_type_mismatch5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_subscriptions(options))) + + def test_get_shadow_failure_subscriptions_type_mismatch311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_subscriptions(options))) + + def test_get_shadow_failure_empty_subscriptions5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _empty_subscription_topic_filters(options))) + + def test_get_shadow_failure_empty_subscriptions311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _empty_subscription_topic_filters(options))) + + def test_get_shadow_failure_none_publish_topic5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _none_publish_topic(options))) + + def test_get_shadow_failure_none_publish_topic311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _none_publish_topic(options))) + + def test_get_shadow_failure_bad_publish_topic5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_future_failure_test(protocol_client, lambda options: _bad_publish_topic(options))) + + def test_get_shadow_failure_bad_publish_topic311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_future_failure_test(protocol_client, lambda options: _bad_publish_topic(options))) + + def test_get_shadow_failure_publish_topic_type_mismatch5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_publish_topic(options))) + + def test_get_shadow_failure_publish_topic_type_mismatch311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_publish_topic(options))) + + def test_get_shadow_failure_correlation_token_type_mismatch5(self): + self._do_mqtt5_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_correlation_token(options))) + + def test_get_shadow_failure_correlation_token_type_mismatch311(self): + self._do_mqtt311_test(lambda protocol_client: self._do_get_shadow_failure_test(protocol_client, lambda options: _type_mismatch_correlation_token(options))) + +if __name__ == 'main': + unittest.main()