diff --git a/examples/mqtt/tls_mutual_auth/.vscode/c_cpp_properties.json b/examples/mqtt/tls_mutual_auth/.vscode/c_cpp_properties.json new file mode 100644 index 000000000..7a5fe1bf2 --- /dev/null +++ b/examples/mqtt/tls_mutual_auth/.vscode/c_cpp_properties.json @@ -0,0 +1,22 @@ +{ + "configurations": [ + { + "name": "Win32", + "includePath": [ + "${workspaceFolder}/**", + "C:/Users/saite/esp/esp-idf/**" + ], + "defines": [ + "_DEBUG", + "UNICODE", + "_UNICODE" + ], + "compilerPath": "C:\\MinGW\\bin\\gcc.exe", + "cStandard": "c11", + "cppStandard": "c++17", + "intelliSenseMode": "windows-gcc-x64", + "compileCommands": "${workspaceFolder}/build/compile_commands.json" + } + ], + "version": 4 +} \ No newline at end of file diff --git a/examples/mqtt/tls_mutual_auth/.vscode/settings.json b/examples/mqtt/tls_mutual_auth/.vscode/settings.json new file mode 100644 index 000000000..b3ece145f --- /dev/null +++ b/examples/mqtt/tls_mutual_auth/.vscode/settings.json @@ -0,0 +1,12 @@ +{ + "idf.adapterTargetName": "esp32c3", + "idf.openOcdConfigs": [ + "board/esp32c3-builtin.cfg" + ], + "idf.portWin": "COM3", + "idf.flashType": "UART", + "files.associations": { + "esp_event.h": "c", + "mqtt_subscription_manager.h": "c" + } +} \ No newline at end of file diff --git a/examples/mqtt/tls_mutual_auth/main/CMakeLists.txt b/examples/mqtt/tls_mutual_auth/main/CMakeLists.txt index 068741bf7..f651af01c 100644 --- a/examples/mqtt/tls_mutual_auth/main/CMakeLists.txt +++ b/examples/mqtt/tls_mutual_auth/main/CMakeLists.txt @@ -1,6 +1,7 @@ set(COMPONENT_SRCS "app_main.c" "mqtt_demo_mutual_auth.c" + "mqtt_subscription_manager.c" ) set(COMPONENT_ADD_INCLUDEDIRS diff --git a/examples/mqtt/tls_mutual_auth/main/app_mqtt_defines.h b/examples/mqtt/tls_mutual_auth/main/app_mqtt_defines.h new file mode 100644 index 000000000..abcac7bee --- /dev/null +++ b/examples/mqtt/tls_mutual_auth/main/app_mqtt_defines.h @@ -0,0 +1,30 @@ +/** + * @brief Structure to keep the MQTT publish packets until an ack is received + * for QoS1 publishes. + */ +typedef struct PublishPackets +{ + /** + * @brief Packet identifier of the publish packet. + */ + uint16_t packetId; + + /** + * @brief Publish info of the publish packet. + */ + MQTTPublishInfo_t pubInfo; +} PublishPackets_t; + +typedef enum +{ + ESTABLISH_BROKER_TLS_CONNC, + ESTABLISH_MQTT_CONNC, + ACTION_ON_PEND_PUBLISHES, + SUBSCRIBE_TO_TOPICS, + PUBLISH_AND_RECV_FROM_CLOUD +}APP_COMMUNICATION_STATES; + +typedef struct sAppCloudMsg +{ + PublishPackets_t PublishPacket; +}AppCloudMsg_t; diff --git a/examples/mqtt/tls_mutual_auth/main/certs/8938ed8d2f0f34ebf9b031dbeee9b7a95f6e5264f846d511386f80d348cb5ced-public.pem.key b/examples/mqtt/tls_mutual_auth/main/certs/8938ed8d2f0f34ebf9b031dbeee9b7a95f6e5264f846d511386f80d348cb5ced-public.pem.key new file mode 100644 index 000000000..de38f153c --- /dev/null +++ b/examples/mqtt/tls_mutual_auth/main/certs/8938ed8d2f0f34ebf9b031dbeee9b7a95f6e5264f846d511386f80d348cb5ced-public.pem.key @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA0TBorE4t0R3BNeseiQvQ +K4i/I5dex5r0/7IcXa9mmTDX6Pn2u18dCeV0b8djs44dMdDTx8eZKwMp5TOsuWla +HHIqSCnYoxG9OfMF45YTWFl1Se7OfPSkAwSxTbq7X8UrYtHUi4xL+Er9AgUKh2vh +sBAawJ+xnE4ZRUfiwnjwIQm1MJoTLbyE7PakO6KCA2t6fa6p6pxl/MJl6ToiqdB9 +tEGQpaA9ePvLSt+m4VNzzA8QQD7M2OP1ZZLf7Cn7Ol5MwezJOpYOf0JUO3VVYzW5 +NpdBM3BEo+rQ8MQbB2j2a/Nl5/QEfSgHOds/dTxvz+6KBgGTcpqI/ZGkjdEyte+4 +awIDAQAB +-----END PUBLIC KEY----- diff --git a/examples/mqtt/tls_mutual_auth/main/certs/AmazonRootCA3.pem b/examples/mqtt/tls_mutual_auth/main/certs/AmazonRootCA3.pem new file mode 100644 index 000000000..3796bb2e5 --- /dev/null +++ b/examples/mqtt/tls_mutual_auth/main/certs/AmazonRootCA3.pem @@ -0,0 +1,12 @@ +-----BEGIN CERTIFICATE----- +MIIBtjCCAVugAwIBAgITBmyf1XSXNmY/Owua2eiedgPySjAKBggqhkjOPQQDAjA5 +MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRkwFwYDVQQDExBBbWF6b24g +Um9vdCBDQSAzMB4XDTE1MDUyNjAwMDAwMFoXDTQwMDUyNjAwMDAwMFowOTELMAkG +A1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJvb3Qg +Q0EgMzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABCmXp8ZBf8ANm+gBG1bG8lKl +ui2yEujSLtf6ycXYqm0fc4E7O5hrOXwzpcVOho6AF2hiRVd9RFgdszflZwjrZt6j +QjBAMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgGGMB0GA1UdDgQWBBSr +ttvXBp43rDCGB5Fwx5zEGbF4wDAKBggqhkjOPQQDAgNJADBGAiEA4IWSoxe3jfkr +BqWTrBqYaGFy+uGh0PsceGCmQ5nFuMQCIQCcAu/xlJyzlvnrxir4tiz+OpAUFteM +YyRIHN8wfdVoOw== +-----END CERTIFICATE----- \ No newline at end of file diff --git a/examples/mqtt/tls_mutual_auth/main/certs/client.crt b/examples/mqtt/tls_mutual_auth/main/certs/client.crt index 6459f31e3..d8c3e74b6 100644 --- a/examples/mqtt/tls_mutual_auth/main/certs/client.crt +++ b/examples/mqtt/tls_mutual_auth/main/certs/client.crt @@ -1 +1,20 @@ -Certificate goes here. \ No newline at end of file +-----BEGIN CERTIFICATE----- +MIIDWTCCAkGgAwIBAgIUXFLyVb6kxkRdcdCyQxlebXNASHswDQYJKoZIhvcNAQEL +BQAwTTFLMEkGA1UECwxCQW1hem9uIFdlYiBTZXJ2aWNlcyBPPUFtYXpvbi5jb20g +SW5jLiBMPVNlYXR0bGUgU1Q9V2FzaGluZ3RvbiBDPVVTMB4XDTIyMDYyMTE2MzE0 +MVoXDTQ5MTIzMTIzNTk1OVowHjEcMBoGA1UEAwwTQVdTIElvVCBDZXJ0aWZpY2F0 +ZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANEwaKxOLdEdwTXrHokL +0CuIvyOXXsea9P+yHF2vZpkw1+j59rtfHQnldG/HY7OOHTHQ08fHmSsDKeUzrLlp +WhxyKkgp2KMRvTnzBeOWE1hZdUnuznz0pAMEsU26u1/FK2LR1IuMS/hK/QIFCodr +4bAQGsCfsZxOGUVH4sJ48CEJtTCaEy28hOz2pDuiggNren2uqeqcZfzCZek6IqnQ +fbRBkKWgPXj7y0rfpuFTc8wPEEA+zNjj9WWS3+wp+zpeTMHsyTqWDn9CVDt1VWM1 +uTaXQTNwRKPq0PDEGwdo9mvzZef0BH0oBznbP3U8b8/uigYBk3KaiP2RpI3RMrXv +uGsCAwEAAaNgMF4wHwYDVR0jBBgwFoAUjCvnk8NZKcKkLcQpOufgEVCh9sgwHQYD +VR0OBBYEFG4hj4FvVESNRBUETvg/ZtOhPMb/MAwGA1UdEwEB/wQCMAAwDgYDVR0P +AQH/BAQDAgeAMA0GCSqGSIb3DQEBCwUAA4IBAQCFnXUU2aT36RsMDXAK9+muxN9S +ZakzzrEzy9gfcZMh8aFURmOFKt8+pzxeRXBblJFcT0EiLIh3iJ0fos0dI2Smz+Qr +NYiKrInNl7u1ZBh0Ut7fqy3qhyi8M0NEGygAITNAAeLBekWQhcjthhtkB/OBCUMJ +EXigqtfGxiWbygEDbX6DwaEr8BZFC8Sdb91ApGVy7ilC5ysgXIIhZZNyfzELqVGD +UQ5mOivy1xLudQ/ta6P1EF+xZLVAboYnd8HniQKSwro+bkEtgJkIIUY6SwP9RbIJ +s5+Oetr2GAnj16fwGdnGFA773M/mybejo7jh+0PN77jCy3LSQz1oMBgZoCgS +-----END CERTIFICATE----- diff --git a/examples/mqtt/tls_mutual_auth/main/certs/client.key b/examples/mqtt/tls_mutual_auth/main/certs/client.key index f6ef61476..1874c1bb8 100644 --- a/examples/mqtt/tls_mutual_auth/main/certs/client.key +++ b/examples/mqtt/tls_mutual_auth/main/certs/client.key @@ -1 +1,27 @@ -Key goes here. \ No newline at end of file +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEA0TBorE4t0R3BNeseiQvQK4i/I5dex5r0/7IcXa9mmTDX6Pn2 +u18dCeV0b8djs44dMdDTx8eZKwMp5TOsuWlaHHIqSCnYoxG9OfMF45YTWFl1Se7O +fPSkAwSxTbq7X8UrYtHUi4xL+Er9AgUKh2vhsBAawJ+xnE4ZRUfiwnjwIQm1MJoT +LbyE7PakO6KCA2t6fa6p6pxl/MJl6ToiqdB9tEGQpaA9ePvLSt+m4VNzzA8QQD7M +2OP1ZZLf7Cn7Ol5MwezJOpYOf0JUO3VVYzW5NpdBM3BEo+rQ8MQbB2j2a/Nl5/QE +fSgHOds/dTxvz+6KBgGTcpqI/ZGkjdEyte+4awIDAQABAoIBAQCwckOoKt1MiOuC +vkpoUHWLcvG+ZJyGgvbqgmKYxy0kQ+WqVsU98JE+2rMm4akAN/v6G+m2lm5ksGtz +L+4eCnX+jnGJovpfOcRBj/JEsD6rA/IBpZO4JGX/QcMIDFymAFqDJPhBqNofmajl +PjlOVrnr1vmpxkl6zQlVvbfQT1BEnDP/k/ESqu91Wx6lq/h++zr9wKMRouB05+wx +dXh0Vu8YYThq17a0wuKJgZMBwsdZG8tBiByahe2izKCqBQc5EOB62+XjZ6Vdweam +lB5JrA0+o7qDFoG9HeW3r6GNKEgGNKbynRw36l4KpEvDVlKTo0MVY7FoSbvYtw2G +/r4hqbNpAoGBAO+UmiztxnJctJ2wuYkYCHGGl9zz4DWB76KSRmGnae2HxCUM/1qC +5ZB6x3KkJWnCw0O4XyuibhNs/RhgbMS/N11RAcuk5mi0Qnmyb/Q8g8cae6LDMXvF +nXm7aWW2xhTlaftmnIY+GknuEFAx8PeXs1lE9funPk/aCSHKoilT4bafAoGBAN+G +mFwk9k7ggup1++0s48YXxapYbWyKeXijlXFkp+jzGuZ0ADgJQbRmPMcaHn8z1hb7 +kcf2YywpaecZTqUx+fJdUBxcfXnRuiP45imE73qbXV6xXyWx5+hPfYmNcVirx/14 +cfvKFs13uKianOg2bQRp0genJj42XRM0a8bnAia1AoGBAOHbbslCU13GxcMPMqnq +jTMxFoOHKWxoFVhmm5E+OV9jKWdgbG6Qjgh5pwMNefPNVzXFX3sMQsdvbN6JMKD+ +Pxxq3M11pH06ypxduqs+TWDrN3kGmOS9NKrpIgBwoSZb7GDDqHXpn/DoTFq96hQP +4FUc7OSUEutGemtv0xYw7M2zAoGAYJHr3VNyvp0sslMK5msQdRLsKS5PCCfDVyWF +eRn9mRz+sDXJqmwFYQUSmQplxpsEkaVaVEPUkM3LnTZHzgMLc3RxGe+znh/IBaaQ ++bsfI59f9Xs4GjcEdE9zcywSnjaJDq8sv5WNyHNC6kTyKTjqXI/DV98VHKesCpIJ +RkOjoEkCgYBsRWioZkxEAFchpgSERsFefXKMl+y462iI5ErALrJf487np5L5LrNp +tPZxF2xdZHGmYs22G30dvVQs6fKDOw3CYEKM/5MxncimiTYe+XFAIvwKMSENCLPP +7xzfj8LbxoifTo77UJDN6jnC++TFc+I1adt95CblRVQtbZcdZleFHw== +-----END RSA PRIVATE KEY----- diff --git a/examples/mqtt/tls_mutual_auth/main/certs/client_1.crt b/examples/mqtt/tls_mutual_auth/main/certs/client_1.crt new file mode 100644 index 000000000..9c6894f55 --- /dev/null +++ b/examples/mqtt/tls_mutual_auth/main/certs/client_1.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDWTCCAkGgAwIBAgIUCzOL9hd9bEjaaYfPhH7zLcHIc9wwDQYJKoZIhvcNAQEL +BQAwTTFLMEkGA1UECwxCQW1hem9uIFdlYiBTZXJ2aWNlcyBPPUFtYXpvbi5jb20g +SW5jLiBMPVNlYXR0bGUgU1Q9V2FzaGluZ3RvbiBDPVVTMB4XDTIyMDYxNzAxMjYz +MloXDTQ5MTIzMTIzNTk1OVowHjEcMBoGA1UEAwwTQVdTIElvVCBDZXJ0aWZpY2F0 +ZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALoPr+6smBk7EIGUV4fr +0Trwgazh9LKPUflGM+7tuwG+gRn2wxlHt3reFOh6U5wXDE9BYzjfoH2jce7m2JiS +tNx91wrWt+F9byHvKI2jvBPElF1a9OJjOLck+gchzJWY3tsnUpEk4Xewx0iifjwT +paBOteEnupQJqQ8uHsKXpwgFUX8ibSx1oEBtd1w58tAlx5NTsdvwJsudBv/m7pfJ +UvzUo7T6Ml5O5xRdxk66IxQothr+Eg1BWBXwHPWutDYXiByvSHX4saaWnvUIYG1n +2AGlJHHh7EAGNMVc6TvSzQ5oOnJQW5b2T31ZzxH7pFLOOw4YXZe9tg31C9w7m2r7 +B2sCAwEAAaNgMF4wHwYDVR0jBBgwFoAUR648FEHbMu9rnPjw4GOLvGUZnWIwHQYD +VR0OBBYEFGkwoed2DJOAIhmIOIjiaYU7/kANMAwGA1UdEwEB/wQCMAAwDgYDVR0P +AQH/BAQDAgeAMA0GCSqGSIb3DQEBCwUAA4IBAQAGl/2EvLSUw/M8pJ1ISWrSoF3q +1wZmWGJh5qdeRN9A5hIoBSfZdc4jKugjEAJ0BOs6XQtp9USgAfk1DKsn8QDGpRJv +9Oj29DyozEam6fHnOkxw2jFdEiZ8LWUkCy65rvSClenfn1q7Vf4aDGJaSQlFv1dI +DfTLaoa5Rbx4s3XlT/sHdcYb7LWS+Rib5SjdL1tOLIYMXDpgQy1dU39Brx6mS8Mp +0cJ59gO5DvFA+13Apf/j0PyNDVbX6j6tImAWeIgoJaqPhD7tT15yEGL8GlM1Ot/k +lZMNyCIJ8WBNQ/Ir8IIIjeCXtLcQV6bAFs47d/wQfin1PASwIZL+zeEoEqsT +-----END CERTIFICATE----- \ No newline at end of file diff --git a/examples/mqtt/tls_mutual_auth/main/certs/client_1.key b/examples/mqtt/tls_mutual_auth/main/certs/client_1.key new file mode 100644 index 000000000..d65bd8709 --- /dev/null +++ b/examples/mqtt/tls_mutual_auth/main/certs/client_1.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAug+v7qyYGTsQgZRXh+vROvCBrOH0so9R+UYz7u27Ab6BGfbD +GUe3et4U6HpTnBcMT0FjON+gfaNx7ubYmJK03H3XCta34X1vIe8ojaO8E8SUXVr0 +4mM4tyT6ByHMlZje2ydSkSThd7DHSKJ+PBOloE614Se6lAmpDy4ewpenCAVRfyJt +LHWgQG13XDny0CXHk1Ox2/Amy50G/+bul8lS/NSjtPoyXk7nFF3GTrojFCi2Gv4S +DUFYFfAc9a60NheIHK9Idfixppae9QhgbWfYAaUkceHsQAY0xVzpO9LNDmg6clBb +lvZPfVnPEfukUs47Dhhdl722DfUL3DubavsHawIDAQABAoIBAHDVkpvrj1G9wATV +MT/8gqZ7tChj30FqKQxCxrve78ne/ewA+dAa7epVQ9i4VnwSfiSSQtn6xBltRWrc +E/5y+bu9lOu9y0W0C/FIWA+NfEGAtXjn8Bpakcp/YiFxHeCbFItTDX94HkO3sSMb +Z/KG96qAEUy6Cyw7r4rFLWJVFbaGefxeE5zpBiI4PlGIqTcwiZbyH3J3XBb1iAN7 +DSy9Uc2HtER9nR/AWnNjvLVCA95HPvu+NMkJMl2jP3z+OSJW3MD5lDJpNoV7HLOR +A7sNc4y21M9WhWO2IQq4VEHo1OSu7ko6optyyKb3MmFQmlpp/xJ0tKFpQZmC1Mvl +YR5IRgECgYEA52M5JbSonArgvfWUCH0RKnRrjx0wSwaf5Jh/9ArUqbYa3iRJSFpe +8BvwLF6Ht+MOCY54/TYGbllo04cqnlR8FTkarHO2D9gqMV06z2RJpMnNlPO30vEe +BfOOP2VqrcIr5/cSKIcS4BPG7A7SXZ+QqKBa4nB46o4ACO8OLXhXcQ8CgYEAzdo1 +2kHRvNHMaV0fJbAUm9X33NdWO1EJTVJbvbH9U9+AdYMtmxa14GBS6IPJV5vNZZ/p +xGhZ3EtvmgXSeW7m5OzrCwei1QkNzisXAEMiBbQQpU09wLbR5DLunlT7joPc+RHb +zEXA/KTv463PBh0oYjg6V/h+SGPlddiaYfgky+UCgYEAz1OGxxRC596PVWYQCDMY +CHRcU9WRiggbiJZkP+TcfFxhYp4c06m122sNHpRtAwV9dG6mVivsQz/1v2Mjes60 +h8es8MyVW0kTNTwWD/IKUo9Hl6lxEp9diXOcB431sk+DS8uEB7BP51uRJ754G893 +rMBR7wdcML6fpWGd0Hw9zV0CgYBdmIe/0wlwFtwgVAzjb+oO6Pyn1ukIU1Ita/L2 +j0Ulq4uW4qwLxgmwGVvZCIK1aMu9FzaM7P1eopX9rAQE3p3xHJ/KBhDNWC6EvSba +IawId9TRtAsN4pIQde/04aL3K5F/VdVgTZ7vwHaqSM7Gct93uSUd9ohEldcw++lF +/jOM2QKBgQC+QdWjdUDtXtsphCqery/zqTwdvu4/rptzo0EmYnnHzuLeJfdGpxb/ +vJ2KuXEC0cflIRiuF2hMii8bYoALvMZUBGujcEhFtVzGGZb7gBeazgYth4TQS2Gb +pEhTr7rJtarptOXsUanjDk570zqLkqNsRWzBwvZKqP0LW75JLoVJHg== +-----END RSA PRIVATE KEY----- \ No newline at end of file diff --git a/examples/mqtt/tls_mutual_auth/main/certs/root_cert_auth.pem b/examples/mqtt/tls_mutual_auth/main/certs/root_cert_auth.pem index a6f3e92af..61ae256dd 100644 --- a/examples/mqtt/tls_mutual_auth/main/certs/root_cert_auth.pem +++ b/examples/mqtt/tls_mutual_auth/main/certs/root_cert_auth.pem @@ -17,4 +17,4 @@ N+gDS63pYaACbvXy8MWy7Vu33PqUXHeeE6V/Uq2V8viTO96LXFvKWlJbYK8U90vv o/ufQJVtMVT8QtPHRh8jrdkPSHCa2XV4cdFyQzR1bldZwgJcJmApzyMZFo6IQ6XU 5MsI+yMRQ+hDKXJioaldXgjUkK642M4UwtBV8ob2xJNDd2ZhwLnoQdeXeGADbkpy rqXRfboQnoZsG4q5WTP468SQvvG5 ------END CERTIFICATE----- +-----END CERTIFICATE----- \ No newline at end of file diff --git a/examples/mqtt/tls_mutual_auth/main/certs/root_cert_auth_1.pem b/examples/mqtt/tls_mutual_auth/main/certs/root_cert_auth_1.pem new file mode 100644 index 000000000..61ae256dd --- /dev/null +++ b/examples/mqtt/tls_mutual_auth/main/certs/root_cert_auth_1.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDQTCCAimgAwIBAgITBmyfz5m/jAo54vB4ikPmljZbyjANBgkqhkiG9w0BAQsF +ADA5MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRkwFwYDVQQDExBBbWF6 +b24gUm9vdCBDQSAxMB4XDTE1MDUyNjAwMDAwMFoXDTM4MDExNzAwMDAwMFowOTEL +MAkGA1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJv +b3QgQ0EgMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALJ4gHHKeNXj +ca9HgFB0fW7Y14h29Jlo91ghYPl0hAEvrAIthtOgQ3pOsqTQNroBvo3bSMgHFzZM +9O6II8c+6zf1tRn4SWiw3te5djgdYZ6k/oI2peVKVuRF4fn9tBb6dNqcmzU5L/qw +IFAGbHrQgLKm+a/sRxmPUDgH3KKHOVj4utWp+UhnMJbulHheb4mjUcAwhmahRWa6 +VOujw5H5SNz/0egwLX0tdHA114gk957EWW67c4cX8jJGKLhD+rcdqsq08p8kDi1L +93FcXmn/6pUCyziKrlA4b9v7LWIbxcceVOF34GfID5yHI9Y/QCB/IIDEgEw+OyQm +jgSubJrIqg0CAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMC +AYYwHQYDVR0OBBYEFIQYzIU07LwMlJQuCFmcx7IQTgoIMA0GCSqGSIb3DQEBCwUA +A4IBAQCY8jdaQZChGsV2USggNiMOruYou6r4lK5IpDB/G/wkjUu0yKGX9rbxenDI +U5PMCCjjmCXPI6T53iHTfIUJrU6adTrCC2qJeHZERxhlbI1Bjjt/msv0tadQ1wUs +N+gDS63pYaACbvXy8MWy7Vu33PqUXHeeE6V/Uq2V8viTO96LXFvKWlJbYK8U90vv +o/ufQJVtMVT8QtPHRh8jrdkPSHCa2XV4cdFyQzR1bldZwgJcJmApzyMZFo6IQ6XU +5MsI+yMRQ+hDKXJioaldXgjUkK642M4UwtBV8ob2xJNDd2ZhwLnoQdeXeGADbkpy +rqXRfboQnoZsG4q5WTP468SQvvG5 +-----END CERTIFICATE----- \ No newline at end of file diff --git a/examples/mqtt/tls_mutual_auth/main/mqtt_demo_mutual_auth.c b/examples/mqtt/tls_mutual_auth/main/mqtt_demo_mutual_auth.c index 78873f9b3..98d87e8c2 100644 --- a/examples/mqtt/tls_mutual_auth/main/mqtt_demo_mutual_auth.c +++ b/examples/mqtt/tls_mutual_auth/main/mqtt_demo_mutual_auth.c @@ -75,6 +75,25 @@ /* Clock for timer. */ #include "clock.h" +/* Include subscription manager. */ +#include "mqtt_subscription_manager.h" + +#include "app_mqtt_defines.h" +#include "esp_event_base.h" + +/* Queue used to send and receive complete Cloud messages structures. */ +QueueHandle_t xAppCloudMsgQueue = NULL; +APP_COMMUNICATION_STATES eAWSCommStates=ESTABLISH_BROKER_TLS_CONNC; +void on_wifi_disconnect_app(void *arg, esp_event_base_t event_base, + int32_t event_id, void *event_data) + { + eAWSCommStates=ESTABLISH_BROKER_TLS_CONNC; + } +void on_wifi_connect_app(void *esp_netif, esp_event_base_t event_base, + int32_t event_id, void *event_data) + { + + } /** * These configuration settings are required to run the mutual auth demo. * Throw compilation error if the below configs are not defined. @@ -189,9 +208,9 @@ * @brief The topic to subscribe and publish to in the example. * * The topic name starts with the client identifier to ensure that each demo - * interacts with a unique topic name. + * interacts with a unique topic name.//CLIENT_IDENTIFIER */ -#define MQTT_EXAMPLE_TOPIC CLIENT_IDENTIFIER "/example/topic" +#define MQTT_EXAMPLE_TOPIC "test/topic" /** * @brief Length of client MQTT topic. @@ -201,7 +220,7 @@ /** * @brief The MQTT message published in this example. */ -#define MQTT_EXAMPLE_MESSAGE "Hello World!" +#define MQTT_EXAMPLE_MESSAGE "Hello Teja!" /** * @brief The length of the MQTT message published in this example. @@ -280,30 +299,19 @@ /*-----------------------------------------------------------*/ -/** - * @brief Structure to keep the MQTT publish packets until an ack is received - * for QoS1 publishes. - */ -typedef struct PublishPackets -{ - /** - * @brief Packet identifier of the publish packet. - */ - uint16_t packetId; - - /** - * @brief Publish info of the publish packet. - */ - MQTTPublishInfo_t pubInfo; -} PublishPackets_t; /*-----------------------------------------------------------*/ +/** + * @brief Flag to represent whether that the humidity topic callback has been invoked. + */ +static bool globalReceivedHumidityData = false; + /** * @brief Packet Identifier generated when Subscribe request was sent to the broker; * it is used to match received Subscribe ACK to the transmitted subscribe. */ -static uint16_t globalSubscribePacketIdentifier = 0U; +static uint16_t lastSubscribePacketIdentifier = 0U; /** * @brief Packet Identifier generated when Unsubscribe request was sent to the broker; @@ -323,7 +331,7 @@ static PublishPackets_t outgoingPublishPackets[ MAX_OUTGOING_PUBLISHES ] = { 0 } * @brief Array to keep subscription topics. * Used to re-subscribe to topics that failed initial subscription attempts. */ -static MQTTSubscribeInfo_t pGlobalSubscriptionList[ 1 ]; +// static MQTTSubscribeInfo_t pGlobalSubscriptionList[ 1 ]; /** * @brief The network buffer must remain valid for the lifetime of the MQTT context. @@ -379,8 +387,8 @@ static int connectToServerWithBackoffRetries( NetworkContext_t * pNetworkContext * * @return EXIT_FAILURE on failure; EXIT_SUCCESS on success. */ -static int subscribePublishLoop( MQTTContext_t * pMqttContext, - bool * pClientSessionPresent ); +// static int subscribePublishLoop( MQTTContext_t * pMqttContext, +// bool * pClientSessionPresent ); /** * @brief The function to handle the incoming publishes. @@ -388,8 +396,7 @@ static int subscribePublishLoop( MQTTContext_t * pMqttContext, * @param[in] pPublishInfo Pointer to publish info of the incoming publish. * @param[in] packetIdentifier Packet identifier of the incoming publish. */ -static void handleIncomingPublish( MQTTPublishInfo_t * pPublishInfo, - uint16_t packetIdentifier ); +static void handleIncomingPublish( MQTTPublishInfo_t * pPublishInfo); /** * @brief The application callback function for getting the incoming publish @@ -443,15 +450,43 @@ static int establishMqttSession( MQTTContext_t * pMqttContext, static int disconnectMqttSession( MQTTContext_t * pMqttContext ); /** - * @brief Sends an MQTT SUBSCRIBE to subscribe to #MQTT_EXAMPLE_TOPIC - * defined at the top of the file. + * @brief Subscribes to the passed topic filter by sending an MQTT SUBSCRIBE + * packet and waiting for a SUBACK acknowledgement response from the broker. * * @param[in] pMqttContext MQTT context pointer. + * @param[in] pTopicFilter The topic filter to subscribe to. + * @param[in] topicFilterLength The length of the topic filter. * * @return EXIT_SUCCESS if SUBSCRIBE was successfully sent; * EXIT_FAILURE otherwise. */ -static int subscribeToTopic( MQTTContext_t * pMqttContext ); +static int subscribeToTopic( MQTTContext_t * pMqttContext, + const char * pTopicFilter, + uint16_t topicFilterLength ); + +/** + * @brief Utility to subscribe to the passed topic filter and register + * a callback for it in the subscription manager. + * + * The registered callback will be invoked by the subscription manager + * when PUBLISH messages on topic(s) that match the registered topic filter + * are received from the broker. + * + * @param[in] pContext The MQTT context representing the MQTT connection. + * @param[in] pTopicFilter The topic filter to subscribe to and register a + * callback for in the subscription manager. + * @param[in] topicFilterLength The length of the topic filter, @p pTopicFilter. + * @param[in] callback The callback to register for the topic filter with the + * subscription manager. + * + * @return EXIT_SUCCESS if subscription and callback registration operations + * for the topic filter were successfully; EXIT_FAILURE otherwise. + */ +static int subscribeToAndRegisterTopicFilter( MQTTContext_t * pContext, + const char * pTopicFilter, + uint16_t topicFilterLength, + SubscriptionManagerCallback_t callback ); + /** * @brief Sends an MQTT UNSUBSCRIBE to unsubscribe from @@ -533,10 +568,40 @@ static void updateSubAckStatus( MQTTPacketInfo_t * pPacketInfo ); * * @param[in] pMqttContext MQTT context pointer. */ -static int handleResubscribe( MQTTContext_t * pMqttContext ); +static int handleResubscribe( MQTTContext_t * pMqttContext, + const char * pTopicFilter, + uint16_t topicFilterLength ); /*-----------------------------------------------------------*/ +/*Callbacks START*/ +static void humidityDataCallback( MQTTContext_t * pContext, + MQTTPublishInfo_t * pPublishInfo ) +{ + LogInfo( ( "Invoked humidity callback. 1" ) ); + assert( pPublishInfo != NULL ); + assert( pContext != NULL ); + /* Suppress unused parameter warning when asserts are disabled in build. */ + ( void ) pContext; + + LogInfo( ( "Invoked humidity callback. 2" ) ); + + /* Set the global flag to indicate that the humidity data has been received. */ + globalReceivedHumidityData = true; + // AppCloudMsg_t sAppCloudMsgSend; + // /* Send the entire structure to the queue created to hold 10 structures. */ + // xQueueSend( /* The handle of the queue. */ + // xAppCloudMsgQueue, + // /* The address of the xMessage variable. sizeof( struct AMessage ) + // bytes are copied from here into the queue. */ + // ( void * ) &sAppCloudMsgSend, + // /* Block time of 0 says don't block if the queue is already full. + // Check the value returned by xQueueSend() to know if the message + // was sent to the queue successfully. */ + // ( TickType_t ) 0 ); + handleIncomingPublish( pPublishInfo ); +} +/*Callback END*/ static uint32_t generateRandomNumber() { return( rand() ); @@ -649,6 +714,8 @@ static int connectToServerWithBackoffRetries( NetworkContext_t * pNetworkContext ( unsigned short ) nextRetryBackOff ) ); Clock_SleepMs( nextRetryBackOff ); } + LogInfo( ( "tlsStatus %d", + tlsStatus) ); } } while( ( tlsStatus != TLS_TRANSPORT_SUCCESS ) && ( backoffAlgStatus == BackoffAlgorithmSuccess ) ); @@ -803,8 +870,7 @@ static int handlePublishResend( MQTTContext_t * pMqttContext ) /*-----------------------------------------------------------*/ -static void handleIncomingPublish( MQTTPublishInfo_t * pPublishInfo, - uint16_t packetIdentifier ) +static void handleIncomingPublish( MQTTPublishInfo_t * pPublishInfo) { assert( pPublishInfo != NULL ); @@ -818,11 +884,11 @@ static void handleIncomingPublish( MQTTPublishInfo_t * pPublishInfo, pPublishInfo->topicNameLength ) ) ) { LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic.\n" - "Incoming Publish message Packet Id is %u.\n" + // "Incoming Publish message Packet Id is %u.\n" "Incoming Publish Message : %.*s.\n\n", pPublishInfo->topicNameLength, pPublishInfo->pTopicName, - packetIdentifier, + // packetIdentifier, ( int ) pPublishInfo->payloadLength, ( const char * ) pPublishInfo->pPayload ) ); } @@ -856,16 +922,28 @@ static void updateSubAckStatus( MQTTPacketInfo_t * pPacketInfo ) /*-----------------------------------------------------------*/ -static int handleResubscribe( MQTTContext_t * pMqttContext ) +static int handleResubscribe( MQTTContext_t * pMqttContext, + const char * pTopicFilter, + uint16_t topicFilterLength ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus = MQTTSuccess; BackoffAlgorithmStatus_t backoffAlgStatus = BackoffAlgorithmSuccess; BackoffAlgorithmContext_t retryParams; uint16_t nextRetryBackOff = 0U; + MQTTSubscribeInfo_t pSubscriptionList[ 1 ]; assert( pMqttContext != NULL ); + /* Start with everything at 0. */ + ( void ) memset( ( void * ) pSubscriptionList, 0x00, sizeof( pSubscriptionList ) ); + + /* This demo subscribes and publishes to topics at Qos1, so the publish + * messages received from the broker should have QoS1 as well. */ + pSubscriptionList[ 0 ].qos = MQTTQoS1; + pSubscriptionList[ 0 ].pTopicFilter = pTopicFilter; + pSubscriptionList[ 0 ].topicFilterLength = topicFilterLength; + /* Initialize retry attempts and interval. */ BackoffAlgorithm_InitializeParams( &retryParams, CONNECTION_RETRY_BACKOFF_BASE_MS, @@ -879,9 +957,9 @@ static int handleResubscribe( MQTTContext_t * pMqttContext ) * because this function is entered only after the receipt of a SUBACK, at which point * its associated packet id is free to use. */ mqttStatus = MQTT_Subscribe( pMqttContext, - pGlobalSubscriptionList, - sizeof( pGlobalSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), - globalSubscribePacketIdentifier ); + pSubscriptionList, + sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), + lastSubscribePacketIdentifier ); if( mqttStatus != MQTTSuccess ) { @@ -957,7 +1035,7 @@ static void eventCallback( MQTTContext_t * pMqttContext, { assert( pDeserializedInfo->pPublishInfo != NULL ); /* Handle incoming publish. */ - handleIncomingPublish( pDeserializedInfo->pPublishInfo, packetIdentifier ); + SubscriptionManager_DispatchHandler( pMqttContext, pDeserializedInfo->pPublishInfo ); } else { @@ -984,7 +1062,7 @@ static void eventCallback( MQTTContext_t * pMqttContext, } /* Make sure ACK packet identifier matches with Request packet identifier. */ - assert( globalSubscribePacketIdentifier == packetIdentifier ); + assert( lastSubscribePacketIdentifier == packetIdentifier ); break; case MQTT_PACKET_TYPE_UNSUBACK: @@ -1123,29 +1201,33 @@ static int disconnectMqttSession( MQTTContext_t * pMqttContext ) /*-----------------------------------------------------------*/ -static int subscribeToTopic( MQTTContext_t * pMqttContext ) +static int subscribeToTopic( MQTTContext_t * pMqttContext, + const char * pTopicFilter, + uint16_t topicFilterLength ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus; + MQTTSubscribeInfo_t pSubscriptionList[ 1 ]; assert( pMqttContext != NULL ); /* Start with everything at 0. */ - ( void ) memset( ( void * ) pGlobalSubscriptionList, 0x00, sizeof( pGlobalSubscriptionList ) ); + ( void ) memset( ( void * ) pSubscriptionList, 0x00, sizeof( pSubscriptionList ) ); - /* This example subscribes to only one topic and uses QOS1. */ - pGlobalSubscriptionList[ 0 ].qos = MQTTQoS1; - pGlobalSubscriptionList[ 0 ].pTopicFilter = MQTT_EXAMPLE_TOPIC; - pGlobalSubscriptionList[ 0 ].topicFilterLength = MQTT_EXAMPLE_TOPIC_LENGTH; + /* This demo subscribes and publishes to topics at Qos1, so the publish + * messages received from the broker should have QoS1 as well. */ + pSubscriptionList[ 0 ].qos = MQTTQoS1; + pSubscriptionList[ 0 ].pTopicFilter = MQTT_EXAMPLE_TOPIC; + pSubscriptionList[ 0 ].topicFilterLength = MQTT_EXAMPLE_TOPIC_LENGTH; /* Generate packet identifier for the SUBSCRIBE packet. */ - globalSubscribePacketIdentifier = MQTT_GetPacketId( pMqttContext ); + lastSubscribePacketIdentifier = MQTT_GetPacketId( pMqttContext ); /* Send SUBSCRIBE packet. */ mqttStatus = MQTT_Subscribe( pMqttContext, - pGlobalSubscriptionList, - sizeof( pGlobalSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), - globalSubscribePacketIdentifier ); + pSubscriptionList, + sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), + lastSubscribePacketIdentifier ); if( mqttStatus != MQTTSuccess ) { @@ -1156,8 +1238,48 @@ static int subscribeToTopic( MQTTContext_t * pMqttContext ) else { LogInfo( ( "SUBSCRIBE sent for topic %.*s to broker.\n\n", - MQTT_EXAMPLE_TOPIC_LENGTH, - MQTT_EXAMPLE_TOPIC ) ); + topicFilterLength, + pTopicFilter ) ); + } + + return returnStatus; +} + +static int subscribeToAndRegisterTopicFilter( MQTTContext_t * pContext, + const char * pTopicFilter, + uint16_t topicFilterLength, + SubscriptionManagerCallback_t callback ) +{ + int returnStatus = EXIT_SUCCESS; + SubscriptionManagerStatus_t managerStatus = ( SubscriptionManagerStatus_t ) 0u; + + /* Register the topic filter and its callback with subscription manager. + * On an incoming PUBLISH message whose topic name that matches the topic filter + * being registered, its callback will be invoked. */ + managerStatus = SubscriptionManager_RegisterCallback( pTopicFilter, + topicFilterLength, + callback ); + if( managerStatus != SUBSCRIPTION_MANAGER_SUCCESS ) + { + returnStatus = EXIT_FAILURE; + } + else + { + LogInfo( ( "Subscribing to the MQTT topic %.*s.", + topicFilterLength, + pTopicFilter ) ); + + returnStatus = subscribeToTopic( pContext, + pTopicFilter, + topicFilterLength ); + } + + if( returnStatus != EXIT_SUCCESS ) + { + /* Remove the registered callback for the temperature topic filter as + * the subscription operation for the topic filter did not succeed. */ + ( void ) SubscriptionManager_RemoveCallback( pTopicFilter, + topicFilterLength ); } return returnStatus; @@ -1169,25 +1291,25 @@ static int unsubscribeFromTopic( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus; - + MQTTSubscribeInfo_t pSubscriptionList[ 1 ]; assert( pMqttContext != NULL ); /* Start with everything at 0. */ - ( void ) memset( ( void * ) pGlobalSubscriptionList, 0x00, sizeof( pGlobalSubscriptionList ) ); + ( void ) memset( ( void * ) pSubscriptionList, 0x00, sizeof( pSubscriptionList ) ); /* This example subscribes to and unsubscribes from only one topic * and uses QOS1. */ - pGlobalSubscriptionList[ 0 ].qos = MQTTQoS1; - pGlobalSubscriptionList[ 0 ].pTopicFilter = MQTT_EXAMPLE_TOPIC; - pGlobalSubscriptionList[ 0 ].topicFilterLength = MQTT_EXAMPLE_TOPIC_LENGTH; + pSubscriptionList[ 0 ].qos = MQTTQoS1; + pSubscriptionList[ 0 ].pTopicFilter = MQTT_EXAMPLE_TOPIC; + pSubscriptionList[ 0 ].topicFilterLength = MQTT_EXAMPLE_TOPIC_LENGTH; /* Generate packet identifier for the UNSUBSCRIBE packet. */ globalUnsubscribePacketIdentifier = MQTT_GetPacketId( pMqttContext ); /* Send UNSUBSCRIBE packet. */ mqttStatus = MQTT_Unsubscribe( pMqttContext, - pGlobalSubscriptionList, - sizeof( pGlobalSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), + pSubscriptionList, + sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), globalUnsubscribePacketIdentifier ); if( mqttStatus != MQTTSuccess ) @@ -1291,7 +1413,7 @@ static int initializeMqtt( MQTTContext_t * pMqttContext, &transport, Clock_GetTimeMs, eventCallback, - &networkBuffer ); + &networkBuffer );//MQTT params are being set if( mqttStatus != MQTTSuccess ) { @@ -1304,197 +1426,77 @@ static int initializeMqtt( MQTTContext_t * pMqttContext, /*-----------------------------------------------------------*/ -static int subscribePublishLoop( MQTTContext_t * pMqttContext, - bool * pClientSessionPresent ) -{ - int returnStatus = EXIT_SUCCESS; - bool mqttSessionEstablished = false, brokerSessionPresent; - MQTTStatus_t mqttStatus = MQTTSuccess; - uint32_t publishCount = 0; - const uint32_t maxPublishCount = MQTT_PUBLISH_COUNT_PER_LOOP; - bool createCleanSession = false; - - assert( pMqttContext != NULL ); - assert( pClientSessionPresent != NULL ); - - /* A clean MQTT session needs to be created, if there is no session saved - * in this MQTT client. */ - createCleanSession = ( *pClientSessionPresent == true ) ? false : true; - - /* Establish MQTT session on top of TCP+TLS connection. */ - LogInfo( ( "Creating an MQTT connection to %.*s.", - AWS_IOT_ENDPOINT_LENGTH, - AWS_IOT_ENDPOINT ) ); - - /* Sends an MQTT Connect packet using the established TLS session, - * then waits for connection acknowledgment (CONNACK) packet. */ - returnStatus = establishMqttSession( pMqttContext, createCleanSession, &brokerSessionPresent ); - - - if( returnStatus == EXIT_SUCCESS ) - { - /* Keep a flag for indicating if MQTT session is established. This - * flag will mark that an MQTT DISCONNECT has to be sent at the end - * of the demo, even if there are intermediate failures. */ - mqttSessionEstablished = true; - - /* Update the flag to indicate that an MQTT client session is saved. - * Once this flag is set, MQTT connect in the following iterations of - * this demo will be attempted without requesting for a clean session. */ - *pClientSessionPresent = true; - - /* Check if session is present and if there are any outgoing publishes - * that need to resend. This is only valid if the broker is - * re-establishing a session which was already present. */ - if( brokerSessionPresent == true ) - { - LogInfo( ( "An MQTT session with broker is re-established. " - "Resending unacked publishes." ) ); - - /* Handle all the resend of publish messages. */ - returnStatus = handlePublishResend( pMqttContext ); - } - else - { - LogInfo( ( "A clean MQTT connection is established." - " Cleaning up all the stored outgoing publishes.\n\n" ) ); - - /* Clean up the outgoing publishes waiting for ack as this new - * connection doesn't re-establish an existing session. */ - cleanupOutgoingPublishes(); - } - } - - if( returnStatus == EXIT_SUCCESS ) - { - /* The client is now connected to the broker. Subscribe to the topic - * as specified in MQTT_EXAMPLE_TOPIC at the top of this file by sending a - * subscribe packet. This client will then publish to the same topic it - * subscribed to, so it will expect all the messages it sends to the broker - * to be sent back to it from the broker. This demo uses QOS1 in Subscribe, - * therefore, the Publish messages received from the broker will have QOS1. */ - LogInfo( ( "Subscribing to the MQTT topic %.*s.", - MQTT_EXAMPLE_TOPIC_LENGTH, - MQTT_EXAMPLE_TOPIC ) ); - returnStatus = subscribeToTopic( pMqttContext ); - } - - if( returnStatus == EXIT_SUCCESS ) - { - /* Process incoming packet from the broker. Acknowledgment for subscription - * ( SUBACK ) will be received here. However after sending the subscribe, the - * client may receive a publish before it receives a subscribe ack. Since this - * demo is subscribing to the topic to which no one is publishing, probability - * of receiving publish message before subscribe ack is zero; but application - * must be ready to receive any packet. This demo uses MQTT_ProcessLoop to - * receive packet from network. */ - mqttStatus = MQTT_ProcessLoop( pMqttContext, MQTT_PROCESS_LOOP_TIMEOUT_MS ); - - if( mqttStatus != MQTTSuccess ) - { - returnStatus = EXIT_FAILURE; - LogError( ( "MQTT_ProcessLoop returned with status = %s.", - MQTT_Status_strerror( mqttStatus ) ) ); - } - } - - /* Check if recent subscription request has been rejected. globalSubAckStatus is updated - * in eventCallback to reflect the status of the SUBACK sent by the broker. */ - if( ( returnStatus == EXIT_SUCCESS ) && ( globalSubAckStatus == MQTTSubAckFailure ) ) - { - /* If server rejected the subscription request, attempt to resubscribe to topic. - * Attempts are made according to the exponential backoff retry strategy - * implemented in retryUtils. */ - LogInfo( ( "Server rejected initial subscription request. Attempting to re-subscribe to topic %.*s.", - MQTT_EXAMPLE_TOPIC_LENGTH, - MQTT_EXAMPLE_TOPIC ) ); - returnStatus = handleResubscribe( pMqttContext ); - } - - if( returnStatus == EXIT_SUCCESS ) - { - /* Publish messages with QOS1, receive incoming messages and - * send keep alive messages. */ - for( publishCount = 0; publishCount < maxPublishCount; publishCount++ ) - { - LogInfo( ( "Sending Publish to the MQTT topic %.*s.", - MQTT_EXAMPLE_TOPIC_LENGTH, - MQTT_EXAMPLE_TOPIC ) ); - returnStatus = publishToTopic( pMqttContext ); - - /* Calling MQTT_ProcessLoop to process incoming publish echo, since - * application subscribed to the same topic the broker will send - * publish message back to the application. This function also - * sends ping request to broker if MQTT_KEEP_ALIVE_INTERVAL_SECONDS - * has expired since the last MQTT packet sent and receive - * ping responses. */ - mqttStatus = MQTT_ProcessLoop( pMqttContext, MQTT_PROCESS_LOOP_TIMEOUT_MS ); - - /* For any error in #MQTT_ProcessLoop, exit the loop and disconnect - * from the broker. */ - if( mqttStatus != MQTTSuccess ) - { - LogError( ( "MQTT_ProcessLoop returned with status = %s.", - MQTT_Status_strerror( mqttStatus ) ) ); - returnStatus = EXIT_FAILURE; - break; - } - - LogInfo( ( "Delay before continuing to next iteration.\n\n" ) ); - - /* Leave connection idle for some time. */ - sleep( DELAY_BETWEEN_PUBLISHES_SECONDS ); - } - } - - if( returnStatus == EXIT_SUCCESS ) - { - /* Unsubscribe from the topic. */ - LogInfo( ( "Unsubscribing from the MQTT topic %.*s.", - MQTT_EXAMPLE_TOPIC_LENGTH, - MQTT_EXAMPLE_TOPIC ) ); - returnStatus = unsubscribeFromTopic( pMqttContext ); - } - - if( returnStatus == EXIT_SUCCESS ) - { - /* Process Incoming UNSUBACK packet from the broker. */ - mqttStatus = MQTT_ProcessLoop( pMqttContext, MQTT_PROCESS_LOOP_TIMEOUT_MS ); - - if( mqttStatus != MQTTSuccess ) - { - returnStatus = EXIT_FAILURE; - LogError( ( "MQTT_ProcessLoop returned with status = %s.", - MQTT_Status_strerror( mqttStatus ) ) ); - } - } - - /* Send an MQTT Disconnect packet over the already connected TCP socket. - * There is no corresponding response for the disconnect packet. After sending - * disconnect, client must close the network connection. */ - if( mqttSessionEstablished == true ) - { - LogInfo( ( "Disconnecting the MQTT connection with %.*s.", - AWS_IOT_ENDPOINT_LENGTH, - AWS_IOT_ENDPOINT ) ); - - if( returnStatus == EXIT_FAILURE ) - { - /* Returned status is not used to update the local status as there - * were failures in demo execution. */ - ( void ) disconnectMqttSession( pMqttContext ); - } - else - { - returnStatus = disconnectMqttSession( pMqttContext ); - } - } - - /* Reset global SUBACK status variable after completion of subscription request cycle. */ - globalSubAckStatus = MQTTSubAckFailure; - - return returnStatus; -} +// static int subscribePublishLoop( MQTTContext_t * pMqttContext, +// bool * pClientSessionPresent ) +// { + + + + + +// if( returnStatus == EXIT_SUCCESS ) +// { +// /* Publish messages with QOS1, receive incoming messages and +// * send keep alive messages. */ +// for( publishCount = 0; publishCount < maxPublishCount; publishCount++ ) +// { + + +// LogInfo( ( "Delay before continuing to next iteration.\n\n" ) ); + +// /* Leave connection idle for some time. */ +// sleep( DELAY_BETWEEN_PUBLISHES_SECONDS ); +// } +// } + +// if( returnStatus == EXIT_SUCCESS ) +// { +// /* Unsubscribe from the topic. */ +// LogInfo( ( "Unsubscribing from the MQTT topic %.*s.", +// MQTT_EXAMPLE_TOPIC_LENGTH, +// MQTT_EXAMPLE_TOPIC ) ); +// returnStatus = unsubscribeFromTopic( pMqttContext ); +// } + +// if( returnStatus == EXIT_SUCCESS ) +// { +// /* Process Incoming UNSUBACK packet from the broker. */ +// mqttStatus = MQTT_ProcessLoop( pMqttContext, MQTT_PROCESS_LOOP_TIMEOUT_MS ); + +// if( mqttStatus != MQTTSuccess ) +// { +// returnStatus = EXIT_FAILURE; +// LogError( ( "MQTT_ProcessLoop returned with status = %s.", +// MQTT_Status_strerror( mqttStatus ) ) ); +// } +// } + +// /* Send an MQTT Disconnect packet over the already connected TCP socket. +// * There is no corresponding response for the disconnect packet. After sending +// * disconnect, client must close the network connection. */ +// if( mqttSessionEstablished == true ) +// { +// LogInfo( ( "Disconnecting the MQTT connection with %.*s.", +// AWS_IOT_ENDPOINT_LENGTH, +// AWS_IOT_ENDPOINT ) ); + +// if( returnStatus == EXIT_FAILURE ) +// { +// /* Returned status is not used to update the local status as there +// * were failures in demo execution. */ +// ( void ) disconnectMqttSession( pMqttContext ); +// } +// else +// { +// returnStatus = disconnectMqttSession( pMqttContext ); +// } +// } + +// /* Reset global SUBACK status variable after completion of subscription request cycle. */ +// globalSubAckStatus = MQTTSubAckFailure; + +// return returnStatus; +// } /*-----------------------------------------------------------*/ @@ -1521,6 +1523,12 @@ int aws_iot_demo_main( int argc, bool clientSessionPresent = false; struct timespec tp; + /*SUBPUB LOOP VARS START*/ + bool mqttSessionEstablished = false, brokerSessionPresent; + MQTTStatus_t mqttStatus = MQTTSuccess; + bool createCleanSession = false; + /*SUBPUB LOOP VARS END*/ + ( void ) argc; ( void ) argv; @@ -1534,45 +1542,239 @@ int aws_iot_demo_main( int argc, /* Initialize MQTT library. Initialization of the MQTT library needs to be * done only once in this demo. */ - returnStatus = initializeMqtt( &mqttContext, &xNetworkContext ); - - if( returnStatus == EXIT_SUCCESS ) - { + returnStatus = initializeMqtt( &mqttContext, &xNetworkContext );//Need to return success always in real app deploy + LogInfo( ( "returnStatus of initializeMqtt: %d",returnStatus ) ); + + /* Create the queue used to send complete struct AMessage structures. This can + also be created after the schedule starts, but care must be task to ensure + nothing uses the queue until after it has been created. */ + xAppCloudMsgQueue = xQueueCreate( + /* The number of items the queue can hold. */ + MAX_OUTGOING_PUBLISHES, + /* Size of each item is big enough to hold the + whole structure. */ + sizeof( AppCloudMsg_t ) ); + //need to enable this in wifi handler + // ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT, WIFI_EVENT_STA_DISCONNECTED, &on_wifi_disconnect_app, NULL)); + // ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT, WIFI_EVENT_STA_CONNECTED, &on_wifi_connect_app, netif)); + + // if( returnStatus == EXIT_SUCCESS ) + // { + eAWSCommStates=ESTABLISH_BROKER_TLS_CONNC; for( ; ; ) { - /* Attempt to connect to the MQTT broker. If connection fails, retry after - * a timeout. Timeout value will be exponentially increased till the maximum - * attempts are reached or maximum timeout value is reached. The function - * returns EXIT_FAILURE if the TCP connection cannot be established to - * broker after configured number of attempts. */ - returnStatus = connectToServerWithBackoffRetries( &xNetworkContext ); - if( returnStatus == EXIT_FAILURE ) + switch (eAWSCommStates) { - /* Log error to indicate connection failure after all - * reconnect attempts are over. */ - LogError( ( "Failed to connect to MQTT broker %.*s.", + case ESTABLISH_BROKER_TLS_CONNC: + /* Attempt to connect to the MQTT broker. If connection fails, retry after + * a timeout. Timeout value will be exponentially increased till the maximum + * attempts are reached or maximum timeout value is reached. The function + * returns EXIT_FAILURE if the TCP connection cannot be established to + * broker after configured number of attempts. */ + returnStatus = connectToServerWithBackoffRetries( &xNetworkContext ); + if( returnStatus == EXIT_FAILURE ) + { + /* Log error to indicate connection failure after all + * reconnect attempts are over. */ + LogError( ( "Failed to connect to MQTT broker %.*s.", + AWS_IOT_ENDPOINT_LENGTH, + AWS_IOT_ENDPOINT ) ); + } + else + { + eAWSCommStates=ESTABLISH_MQTT_CONNC; + } + break; + case ESTABLISH_MQTT_CONNC: + { + assert( &mqttContext != NULL ); + assert( &clientSessionPresent != NULL ); + + /* A clean MQTT session needs to be created, if there is no session saved + * in this MQTT client. */ + createCleanSession = ( clientSessionPresent == true ) ? false : true; + + /* Establish MQTT session on top of TCP+TLS connection. */ + LogInfo( ( "Creating an MQTT connection to %.*s.", AWS_IOT_ENDPOINT_LENGTH, AWS_IOT_ENDPOINT ) ); - } - else - { - /* If TLS session is established, execute Subscribe/Publish loop. */ - returnStatus = subscribePublishLoop( &mqttContext, &clientSessionPresent ); - } - if( returnStatus == EXIT_SUCCESS ) - { - /* Log message indicating an iteration completed successfully. */ - LogInfo( ( "Demo completed successfully." ) ); + /* Sends an MQTT Connect packet using the established TLS session, + * then waits for connection acknowledgment (CONNACK) packet. */ + returnStatus = establishMqttSession( &mqttContext, createCleanSession, &brokerSessionPresent ); + if(returnStatus==EXIT_SUCCESS) + eAWSCommStates=ACTION_ON_PEND_PUBLISHES; + break; + } + + case ACTION_ON_PEND_PUBLISHES: + { + /* Keep a flag for indicating if MQTT session is established. This + * flag will mark that an MQTT DISCONNECT has to be sent at the end + * of the demo, even if there are intermediate failures. */ + mqttSessionEstablished = true; + + /* Update the flag to indicate that an MQTT client session is saved. + * Once this flag is set, MQTT connect in the following iterations of + * this demo will be attempted without requesting for a clean session. */ + clientSessionPresent = true; + + /* Check if session is present and if there are any outgoing publishes + * that need to resend. This is only valid if the broker is + * re-establishing a session which was already present. */ + if( brokerSessionPresent == true ) + { + LogInfo( ( "An MQTT session with broker is re-established. " + "Resending unacked publishes." ) ); + + /* Handle all the resend of publish messages. */ + returnStatus = handlePublishResend( &mqttContext ); + } + else + { + LogInfo( ( "A clean MQTT connection is established." + " Cleaning up all the stored outgoing publishes.\n\n" ) ); + + /* Clean up the outgoing publishes waiting for ack as this new + * connection doesn't re-establish an existing session. */ + cleanupOutgoingPublishes(); + } + eAWSCommStates=SUBSCRIBE_TO_TOPICS; + } + break; + case SUBSCRIBE_TO_TOPICS: + //TODO: need to update this block for multiple topic subscribes + //need to check why wrong topic subscription is behaving wierdly + returnStatus = EXIT_SUCCESS;//need to update this only patch + if( returnStatus == EXIT_SUCCESS ) + { + /* The client is now connected to the broker. Subscribe to the topic + * as specified in MQTT_EXAMPLE_TOPIC at the top of this file by sending a + * subscribe packet. This client will then publish to the same topic it + * subscribed to, so it will expect all the messages it sends to the broker + * to be sent back to it from the broker. This demo uses QOS1 in Subscribe, + * therefore, the Publish messages received from the broker will have QOS1. */ + LogInfo( ( "Subscribing to the MQTT topic %.*s.", + MQTT_EXAMPLE_TOPIC_LENGTH, + MQTT_EXAMPLE_TOPIC ) ); + + // returnStatus = subscribeToTopic( pMqttContext, MQTT_EXAMPLE_TOPIC, MQTT_EXAMPLE_TOPIC_LENGTH ); + + /* Subscribe to a humidity topic filter, this time without any wildcard characters, so that we can + * receive incoming PUBLISH message only on the same topic from the broker. */ + returnStatus = subscribeToAndRegisterTopicFilter( &mqttContext, + MQTT_EXAMPLE_TOPIC, + MQTT_EXAMPLE_TOPIC_LENGTH, + humidityDataCallback ); + + } + if( returnStatus == EXIT_SUCCESS ) + { + /* Process incoming packet from the broker. Acknowledgment for subscription + * ( SUBACK ) will be received here. However after sending the subscribe, the + * client may receive a publish before it receives a subscribe ack. Since this + * demo is subscribing to the topic to which no one is publishing, probability + * of receiving publish message before subscribe ack is zero; but application + * must be ready to receive any packet. This demo uses MQTT_ProcessLoop to + * receive packet from network. */ + mqttStatus = MQTT_ProcessLoop( &mqttContext, MQTT_PROCESS_LOOP_TIMEOUT_MS ); + if( mqttStatus != MQTTSuccess ) + { + returnStatus = EXIT_FAILURE; + LogError( ( "MQTT_ProcessLoop returned with status = %s.", + MQTT_Status_strerror( mqttStatus ) ) ); + } + else if(globalSubAckStatus != MQTTSubAckFailure) + { + eAWSCommStates=PUBLISH_AND_RECV_FROM_CLOUD; + break; + } + } + /* Check if recent subscription request has been rejected. globalSubAckStatus is updated + * in eventCallback to reflect the status of the SUBACK sent by the broker. */ + if( ( returnStatus == EXIT_SUCCESS ) && ( globalSubAckStatus == MQTTSubAckFailure ) ) + { + /* If server rejected the subscription request, attempt to resubscribe to topic. + * Attempts are made according to the exponential backoff retry strategy + * implemented in retryUtils. */ + LogInfo( ( "Server rejected initial subscription request. Attempting to re-subscribe to topic %.*s.", + MQTT_EXAMPLE_TOPIC_LENGTH, + MQTT_EXAMPLE_TOPIC ) ); + returnStatus = handleResubscribe( &mqttContext, MQTT_EXAMPLE_TOPIC, MQTT_EXAMPLE_TOPIC_LENGTH ); + if(returnStatus==EXIT_SUCCESS) + { + eAWSCommStates=PUBLISH_AND_RECV_FROM_CLOUD; + } + } + break; + case PUBLISH_AND_RECV_FROM_CLOUD: + { + AppCloudMsg_t sAppCloudMsg; + // AppCloudMsg_t sAppCloudMsgSend; + if( xAppCloudMsgQueue != NULL ) + { + // /* Send the entire structure to the queue created to hold 10 structures. */ + // xQueueSend( /* The handle of the queue. */ + // xAppCloudMsgQueue, + // /* The address of the xMessage variable. sizeof( struct AMessage ) + // bytes are copied from here into the queue. */ + // ( void * ) &sAppCloudMsgSend, + // /* Block time of 0 says don't block if the queue is already full. + // Check the value returned by xQueueSend() to know if the message + // was sent to the queue successfully. */ + // ( TickType_t ) 0 ); + + /* Receive a message from the created queue to hold complex struct AMessage + structure. Block for 10 ticks if a message is not immediately available. + The value is read into a struct AMessage variable, so after calling + xQueueReceive() xRxedStructure will hold a copy of xMessage. */ + if( xQueueReceive( xAppCloudMsgQueue, + &( sAppCloudMsg ), + ( TickType_t )0 ) == pdPASS ) + { + LogInfo( ( "Sending Publish to the MQTT topic %.*s.", + MQTT_EXAMPLE_TOPIC_LENGTH, + MQTT_EXAMPLE_TOPIC ) ); + returnStatus = publishToTopic( &mqttContext ); + } + } + + + /* Calling MQTT_ProcessLoop to process incoming publish echo, since + * application subscribed to the same topic the broker will send + * publish message back to the application. This function also + * sends ping request to broker if MQTT_KEEP_ALIVE_INTERVAL_SECONDS + * has expired since the last MQTT packet sent and receive + * ping responses. */ + mqttStatus = MQTT_ProcessLoop( &mqttContext, MQTT_PROCESS_LOOP_TIMEOUT_MS ); + + /* For any error in #MQTT_ProcessLoop, exit the loop and disconnect + * from the broker. */ + if( mqttStatus != MQTTSuccess ) + { + LogError( ( "MQTT_ProcessLoop returned with status = %s.", + MQTT_Status_strerror( mqttStatus ) ) ); + returnStatus = EXIT_FAILURE; + break; + } + LogInfo( ( "Short delay before starting the next iteration....\n" ) ); + sleep( MQTT_SUBPUB_LOOP_DELAY_SECONDS ); + break; + } + default: + break; } + // /* If TLS session is established, execute Subscribe/Publish loop. */ + // returnStatus = subscribePublishLoop( &mqttContext, &clientSessionPresent ); - /* End TLS session, then close TCP connection. */ - ( void ) xTlsDisconnect( &xNetworkContext ); + // /* End TLS session, then close TCP connection. */ + // ( void ) xTlsDisconnect( &xNetworkContext ); - LogInfo( ( "Short delay before starting the next iteration....\n" ) ); - sleep( MQTT_SUBPUB_LOOP_DELAY_SECONDS ); + // LogInfo( ( "Short delay before starting the next iteration....\n" ) ); + // sleep( MQTT_SUBPUB_LOOP_DELAY_SECONDS ); + sleep(1);//yielding to avoid task watchdog } - } + // } return returnStatus; } diff --git a/examples/mqtt/tls_mutual_auth/main/mqtt_subscription_manager.c b/examples/mqtt/tls_mutual_auth/main/mqtt_subscription_manager.c new file mode 100644 index 000000000..d25a8f2f2 --- /dev/null +++ b/examples/mqtt/tls_mutual_auth/main/mqtt_subscription_manager.c @@ -0,0 +1,218 @@ +/* + * AWS IoT Device SDK for Embedded C 202108.00 + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +/** + * @file mqtt_subscription_manager.c + * @brief Implementation of the API of a subscription manager for handling subscription callbacks + * to topic filters in MQTT operations. + */ + +/* Standard includes. */ +#include +#include + +/* Include demo config. */ +#include "demo_config.h" + +/* Include header for the subscription manager. */ +#include "mqtt_subscription_manager.h" + + +/** + * @brief Represents a registered record of the topic filter and its associated callback + * in the subscription manager registry. + */ +typedef struct SubscriptionManagerRecord +{ + const char * pTopicFilter; + uint16_t topicFilterLength; + SubscriptionManagerCallback_t callback; +} SubscriptionManagerRecord_t; + +/** + * @brief The default value for the maximum size of the callback registry in the + * subscription manager. + */ +#ifndef MAX_SUBSCRIPTION_CALLBACK_RECORDS + #define MAX_SUBSCRIPTION_CALLBACK_RECORDS 5 +#endif + +/** + * @brief The registry to store records of topic filters and their subscription callbacks. + */ +static SubscriptionManagerRecord_t callbackRecordList[ MAX_SUBSCRIPTION_CALLBACK_RECORDS ] = { 0 }; + +/*-----------------------------------------------------------*/ + +void SubscriptionManager_DispatchHandler( MQTTContext_t * pContext, + MQTTPublishInfo_t * pPublishInfo ) +{ + bool matchStatus = false; + size_t listIndex = 0u; + + assert( pPublishInfo != NULL ); + assert( pContext != NULL ); + + /* Iterate through record list to find matching topics, and invoke their callbacks. */ + for( listIndex = 0; listIndex < MAX_SUBSCRIPTION_CALLBACK_RECORDS; listIndex++ ) + { + if( ( callbackRecordList[ listIndex ].pTopicFilter != NULL ) && + ( MQTT_MatchTopic( pPublishInfo->pTopicName, + pPublishInfo->topicNameLength, + callbackRecordList[ listIndex ].pTopicFilter, + callbackRecordList[ listIndex ].topicFilterLength, + &matchStatus ) == MQTTSuccess ) && + ( matchStatus == true ) ) + { + LogInfo( ( "Invoking subscription callback of matching topic filter: " + "TopicFilter=%.*s, TopicName=%.*s", + callbackRecordList[ listIndex ].topicFilterLength, + callbackRecordList[ listIndex ].pTopicFilter, + pPublishInfo->topicNameLength, + pPublishInfo->pTopicName ) ); + + /* Invoke the callback associated with the record as the topics match. */ + callbackRecordList[ listIndex ].callback( pContext, pPublishInfo ); + } + } +} + +/*-----------------------------------------------------------*/ + +SubscriptionManagerStatus_t SubscriptionManager_RegisterCallback( const char * pTopicFilter, + uint16_t topicFilterLength, + SubscriptionManagerCallback_t callback ) +{ + assert( pTopicFilter != NULL ); + assert( topicFilterLength != 0 ); + assert( callback != NULL ); + + SubscriptionManagerStatus_t returnStatus; + size_t availableIndex = MAX_SUBSCRIPTION_CALLBACK_RECORDS; + bool recordExists = false; + size_t index = 0u; + + /* Search for the first available spot in the list to store the record, and also check if + * a record for the topic filter already exists. */ + while( ( recordExists == false ) && ( index < MAX_SUBSCRIPTION_CALLBACK_RECORDS ) ) + { + /* Check if the index represents an empty spot in the registry. If we had already + * found an empty spot in the list, we will not update it. */ + if( ( availableIndex == MAX_SUBSCRIPTION_CALLBACK_RECORDS ) && + ( callbackRecordList[ index ].pTopicFilter == NULL ) ) + { + availableIndex = index; + } + + /* Check if the current record's topic filter in the registry matches the topic filter + * we are trying to register. */ + else if( ( callbackRecordList[ index ].topicFilterLength == topicFilterLength ) && + ( strncmp( pTopicFilter, callbackRecordList[ index ].pTopicFilter, topicFilterLength ) + == 0 ) ) + { + recordExists = true; + } + + index++; + } + + if( recordExists == true ) + { + /* The record for the topic filter already exists. */ + LogError( ( "Failed to register callback: Record for topic filter already exists: TopicFilter=%.*s", + topicFilterLength, + pTopicFilter ) ); + + returnStatus = SUBSCRIPTION_MANAGER_RECORD_EXISTS; + } + else if( availableIndex == MAX_SUBSCRIPTION_CALLBACK_RECORDS ) + { + /* The registry is full. */ + LogError( ( "Unable to register callback: Registry list is full: TopicFilter=%.*s, MaxRegistrySize=%u", + topicFilterLength, + pTopicFilter, + MAX_SUBSCRIPTION_CALLBACK_RECORDS ) ); + + returnStatus = SUBSCRIPTION_MANAGER_REGISTRY_FULL; + } + else + { + callbackRecordList[ availableIndex ].pTopicFilter = pTopicFilter; + callbackRecordList[ availableIndex ].topicFilterLength = topicFilterLength; + callbackRecordList[ availableIndex ].callback = callback; + + returnStatus = SUBSCRIPTION_MANAGER_SUCCESS; + + LogDebug( ( "Added callback to registry: TopicFilter=%.*s", + topicFilterLength, + pTopicFilter ) ); + } + + return returnStatus; +} + +/*-----------------------------------------------------------*/ + +void SubscriptionManager_RemoveCallback( const char * pTopicFilter, + uint16_t topicFilterLength ) +{ + assert( pTopicFilter != NULL ); + assert( topicFilterLength != 0 ); + + size_t index; + SubscriptionManagerRecord_t * pRecord = NULL; + + /* Iterate through the records list to find the matching record. */ + for( index = 0; index < MAX_SUBSCRIPTION_CALLBACK_RECORDS; index++ ) + { + pRecord = &callbackRecordList[ index ]; + + /* Only match the non-empty records. */ + if( pRecord->pTopicFilter != NULL ) + { + if( ( topicFilterLength == pRecord->topicFilterLength ) && + ( strncmp( pTopicFilter, pRecord->pTopicFilter, topicFilterLength ) == 0 ) ) + { + break; + } + } + } + + /* Delete the record by clearing the found entry in the records list. */ + if( index < MAX_SUBSCRIPTION_CALLBACK_RECORDS ) + { + pRecord->pTopicFilter = NULL; + pRecord->topicFilterLength = 0u; + pRecord->callback = NULL; + + LogDebug( ( "Deleted callback record for topic filter: TopicFilter=%.*s", + topicFilterLength, + pTopicFilter ) ); + } + else + { + LogWarn( ( "Attempted to remove callback for un-registered topic filter: TopicFilter=%.*s", + topicFilterLength, + pTopicFilter ) ); + } +} +/*-----------------------------------------------------------*/ diff --git a/examples/mqtt/tls_mutual_auth/main/mqtt_subscription_manager.h b/examples/mqtt/tls_mutual_auth/main/mqtt_subscription_manager.h new file mode 100644 index 000000000..ef7475813 --- /dev/null +++ b/examples/mqtt/tls_mutual_auth/main/mqtt_subscription_manager.h @@ -0,0 +1,155 @@ +/* + * AWS IoT Device SDK for Embedded C 202108.00 + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +/** + * @file mqtt_subscription_manager.h + * @brief The API of a subscription manager for handling subscription callbacks + * to topic filters in MQTT operations. + */ + +#ifndef MQTT_SUBSCRIPTION_MANAGER_H_ +#define MQTT_SUBSCRIPTION_MANAGER_H_ + +/**************************************************/ +/******* DO NOT CHANGE the following order ********/ +/**************************************************/ + +/* Logging related header files are required to be included in the following order: + * 1. Include the header file "logging_levels.h". + * 2. Define LIBRARY_LOG_NAME and LIBRARY_LOG_LEVEL. + * 3. Include the header file "logging_stack.h". + */ + +/* Include header that defines log levels. */ +#include "logging_levels.h" + +/* Logging configuration for the Subscription Manager module. */ +#ifndef LIBRARY_LOG_NAME + #define LIBRARY_LOG_NAME "Subscription Manager" +#endif +#ifndef LIBRARY_LOG_LEVEL + #define LIBRARY_LOG_LEVEL LOG_DEBUG +#endif + +#include "logging_stack.h" + +/************ End of logging configuration ****************/ + +/* *INDENT-OFF* */ +#ifdef __cplusplus + extern "C" { +#endif +/* *INDENT-ON* */ + +/* Include MQTT library. */ +#include "core_mqtt.h" + +/* Enumeration type for return status value from Subscription Manager API. */ +typedef enum SubscriptionManagerStatus +{ + /** + * @brief Success return value from Subscription Manager API. + */ + SUBSCRIPTION_MANAGER_SUCCESS = 1, + + /** + * @brief Failure return value due to registry being full. + */ + SUBSCRIPTION_MANAGER_REGISTRY_FULL = 2, + + /** + * @brief Failure return value due to an already existing record in the + * registry for a new callback registration's requested topic filter. + */ + SUBSCRIPTION_MANAGER_RECORD_EXISTS = 3 +} SubscriptionManagerStatus_t; + + +/** + * @brief Callback type to be registered for a topic filter with the subscription manager. + * + * For incoming PUBLISH messages received on topics that match the registered topic filter, + * the callback would be invoked by the subscription manager. + * + * @param[in] pContext The context associated with the MQTT connection. + * @param[in] pPublishInfo The incoming PUBLISH message information. + */ +typedef void (* SubscriptionManagerCallback_t )( MQTTContext_t * pContext, + MQTTPublishInfo_t * pPublishInfo ); + +/** + * @brief Dispatches the incoming PUBLISH message to the callbacks that have their + * registered topic filters matching the incoming PUBLISH topic name. The dispatch + * handler will invoke all these callbacks with matching topic filters. + * + * @param[in] pContext The context associated with the MQTT connection. + * @param[in] pPublishInfo The incoming PUBLISH message information. + */ +void SubscriptionManager_DispatchHandler( MQTTContext_t * pContext, + MQTTPublishInfo_t * pPublishInfo ); + +/** + * @brief Utility to register a callback for a topic filter in the subscription manager. + * + * The callback will be invoked when an incoming PUBLISH message is received on + * a topic that matches the topic filter, @a pTopicFilter. The subscription manager + * accepts wildcard topic filters. + * + * @param[in] pTopicFilter The topic filter to register the callback for. + * @param[in] topicFilterLength The length of the topic filter string. + * @param[in] callback The callback to be registered for the topic filter. + * + * @note The subscription manager does not allow more than one callback to be registered + * for the same topic filter. + * @note The passed topic filter, @a pTopicFilter, is saved in the registry. + * The application must not free or alter the content of the topic filter memory + * until the callback for the topic filter is removed from the subscription manager. + * + * @return Returns one of the following: + * - #SUBSCRIPTION_MANAGER_SUCCESS if registration of the callback is successful. + * - #SUBSCRIPTION_MANAGER_REGISTRY_FULL if the registration failed due to registry + * being already full. + * - #SUBSCRIPTION_MANAGER_RECORD_EXISTS, if a registered callback already exists for + * the requested topic filter in the subscription manager. + */ +SubscriptionManagerStatus_t SubscriptionManager_RegisterCallback( const char * pTopicFilter, + uint16_t topicFilterLength, + SubscriptionManagerCallback_t pCallback ); + +/** + * @brief Utility to remove the callback registered for a topic filter from the + * subscription manager. + * + * @param[in] pTopicFilter The topic filter to remove from the subscription manager. + * @param[in] topicFilterLength The length of the topic filter string. + */ +void SubscriptionManager_RemoveCallback( const char * pTopicFilter, + uint16_t topicFilterLength ); + + +/* *INDENT-OFF* */ +#ifdef __cplusplus + } +#endif +/* *INDENT-ON* */ + +#endif /* ifndef MQTT_SUBSCRIPTION_MANAGER_H_ */