Skip to content

Commit

Permalink
Retry connection after getting disconnected from server. (#168)
Browse files Browse the repository at this point in the history
* Set TCP keep alive and reset connection configurations for devices

* update version

* updated onConnectionResumed method

* remove tcp keep alive

* Updated log statements

* updated version.
  • Loading branch information
HarshGandhi-AWS authored Sep 3, 2021
1 parent 1988dbc commit 679afbc
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 11 deletions.
47 changes: 45 additions & 2 deletions source/SharedCrtResourceManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ using namespace Aws::Iot::DeviceClient::Logging;

constexpr int SharedCrtResourceManager::DEFAULT_WAIT_TIME_SECONDS;

bool SharedCrtResourceManager::initialize(const PlainConfig &config)
bool SharedCrtResourceManager::initialize(const PlainConfig &config, vector<Feature *> *featuresList)
{
features = featuresList;
initializeAllocator();
initialized = buildClient(config) == SharedCrtResourceManager::SUCCESS;
return initialized;
Expand Down Expand Up @@ -167,6 +168,7 @@ int SharedCrtResourceManager::establishConnection(const PlainConfig &config)
clientConfigBuilder.WithCertificateAuthority(config.rootCa->c_str());
clientConfigBuilder.WithSdkName(SharedCrtResourceManager::BINARY_NAME);
clientConfigBuilder.WithSdkVersion(SharedCrtResourceManager::BINARY_VERSION);

auto clientConfig = clientConfigBuilder.Build();

if (!clientConfig)
Expand Down Expand Up @@ -222,11 +224,43 @@ int SharedCrtResourceManager::establishConnection(const PlainConfig &config)
}
};

/*
* Invoked when connection is interrupted.
*/
auto OnConnectionInterrupted = [&](Mqtt::MqttConnection &, int errorCode) {
{
if (errorCode)
{
LOGM_ERROR(
TAG,
"MQTT Connection interrupted with error: `%s`. Device Client will retry connection until it is "
"successfully connected to the core. ",
ErrorDebugString(errorCode));
}
}
};

/*
* Invoked when connection is resumed.
*/
auto OnConnectionResumed = [&](Mqtt::MqttConnection &, int returnCode, bool) {
{
LOGM_INFO(TAG, "MQTT connection resumed with return code: %d", returnCode);
}
};

connection->OnConnectionCompleted = move(onConnectionCompleted);
connection->OnDisconnect = move(onDisconnect);
connection->OnConnectionInterrupted = move(OnConnectionInterrupted);
connection->OnConnectionResumed = move(OnConnectionResumed);

LOGM_INFO(TAG, "Establishing MQTT connection with client id %s...", config.thingName->c_str());
if (!connection->Connect(config.thingName->c_str(), true, 0))
if (!connection->SetReconnectTimeout(15, 240))
{
LOG_ERROR(TAG, "Device Client is not able to set reconnection settings. Device Client will retry again.");
return RETRY;
}
if (!connection->Connect(config.thingName->c_str(), false))
{
LOGM_ERROR(TAG, "MQTT Connection failed with error: %s", ErrorDebugString(connection->LastError()));
return RETRY;
Expand Down Expand Up @@ -306,3 +340,12 @@ void SharedCrtResourceManager::disconnect()
LOG_ERROR(TAG, "MQTT Connection failed to disconnect");
}
}

void SharedCrtResourceManager::startDeviceClientFeatures()
{
LOG_INFO(TAG, "Starting Device Client features.");
for (auto *feature : *features)
{
feature->start();
}
}
5 changes: 4 additions & 1 deletion source/SharedCrtResourceManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#ifndef DEVICE_CLIENT_SHAREDCRTRESOURCEMANAGER_H
#define DEVICE_CLIENT_SHAREDCRTRESOURCEMANAGER_H

#include "Feature.h"
#include "config/Config.h"

#include <atomic>
Expand Down Expand Up @@ -41,6 +42,7 @@ namespace Aws
std::unique_ptr<Aws::Iot::MqttClient> mqttClient;
std::shared_ptr<Crt::Mqtt::MqttConnection> connection;
struct aws_allocator *allocator;
std::vector<Feature *> *features;

bool locateCredentials(const PlainConfig &config);
int buildClient(const PlainConfig &config);
Expand All @@ -50,9 +52,10 @@ namespace Aws
static const int SUCCESS = 0;
static const int RETRY = 1;
static const int ABORT = 2;
bool initialize(const PlainConfig &config);
bool initialize(const PlainConfig &config, std::vector<Feature *> *features);
void initializeAWSHttpLib();
int establishConnection(const PlainConfig &config);
void startDeviceClientFeatures();
std::shared_ptr<Crt::Mqtt::MqttConnection> getConnection();
Aws::Crt::Io::EventLoopGroup *getEventLoopGroup();
struct aws_allocator *getAllocator();
Expand Down
11 changes: 4 additions & 7 deletions source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ int main(int argc, char *argv[])
shared_ptr<DefaultClientBaseNotifier> listener =
shared_ptr<DefaultClientBaseNotifier>(new DefaultClientBaseNotifier);
resourceManager = shared_ptr<SharedCrtResourceManager>(new SharedCrtResourceManager);
if (!resourceManager.get()->initialize(config.config))
if (!resourceManager.get()->initialize(config.config, &features))
{
LOGM_ERROR(
TAG,
Expand All @@ -300,7 +300,7 @@ int main(int argc, char *argv[])
{

/*
* Establish MQTT connection using claim certificates and private key to provision device/thing.
* Establish MQTT connection using claim certificates and private key to provision the device/thing.
*/
# if !defined(DISABLE_MQTT)
attemptConnection();
Expand All @@ -316,7 +316,7 @@ int main(int argc, char *argv[])
{
LOGM_ERROR(
TAG,
"*** %s: Failed to Provision thing or Validate newly created resources. "
"*** %s: Failed to Provision thing or validate newly created resources. "
"Please verify your AWS IoT credentials, "
"configuration, Fleet Provisioning Template, claim certificate and policy used. ***",
DC_FATAL_ERROR);
Expand Down Expand Up @@ -428,10 +428,7 @@ int main(int argc, char *argv[])
# endif
#endif

for (auto &feature : features)
{
feature->start();
}
resourceManager->startDeviceClientFeatures();
featuresReadWriteLock.unlock(); // UNLOCK

// Now allow this thread to sleep until it's interrupted by a signal
Expand Down
2 changes: 1 addition & 1 deletion source/util/Retry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ bool Retry::exponentialBackoff(

if (config.maxRetries < 0)
{
LOG_DEBUG(TAG, "Retryable function will retry until success");
LOG_DEBUG(TAG, "Retryable function starting, it will retry until success");
}

bool successful = false;
Expand Down

0 comments on commit 679afbc

Please sign in to comment.