Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Source/FikaAmazonAPI.SampleCode/CustomMessageReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,21 @@ public void NewMessageRevicedTriger(NotificationMessageResponce message)
//Your Code here
}
}

public class CustomMessageRecieverWithResult : IMessageReceiverWithResult
{
public void ErrorCatch(Exception ex)
{
//Your code here
}

public bool NewMessageRevicedTriger(NotificationMessageResponce message)
{
Console.Write(".");
//Your Code here

// Return true to acknowledge and delete the message from queue, false to leave it in the queue.
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using FikaAmazonAPI.NotificationMessages;
using System;

namespace FikaAmazonAPI.Parameter.Notification
{
public interface IMessageReceiverWithResult
{
bool NewMessageRevicedTriger(NotificationMessageResponce message);
void ErrorCatch(Exception ex);
}
}
40 changes: 35 additions & 5 deletions Source/FikaAmazonAPI/Services/NotificationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using static FikaAmazonAPI.AmazonSpApiSDK.Models.Token.CacheTokenData;
using static FikaAmazonAPI.Utils.Constants;

Expand Down Expand Up @@ -117,7 +118,15 @@ public async Task<bool> DeleteSubscriptionByIdAsync(NotificationType notificatio
public void StartReceivingNotificationMessages(ParameterMessageReceiver param, IMessageReceiver messageReceiver, bool isDeleteNotificationAfterRead = true) =>
Task.Run(() => StartReceivingNotificationMessagesAsync(param, messageReceiver, isDeleteNotificationAfterRead)).ConfigureAwait(false).GetAwaiter().GetResult();

public async Task StartReceivingNotificationMessagesAsync(ParameterMessageReceiver param, IMessageReceiver messageReceiver, bool isDeleteNotificationAfterRead = true, CancellationToken cancellationToken = default)
public void StartReceivingNotificationMessages(ParameterMessageReceiver param, IMessageReceiverWithResult messageReceiver, bool isDeleteNotificationAfterRead = false) =>
Task.Run(() => StartReceivingNotificationMessagesAsync(param, messageReceiver, isDeleteNotificationAfterRead)).ConfigureAwait(false).GetAwaiter().GetResult();

public Task StartReceivingNotificationMessagesAsync(ParameterMessageReceiver param, IMessageReceiver messageReceiver, bool isDeleteNotificationAfterRead = true, CancellationToken cancellationToken = default)
{
return StartReceivingNotificationMessagesAsync(param, new MessageReceiverAdapter(messageReceiver), isDeleteNotificationAfterRead, cancellationToken);
}

public async Task StartReceivingNotificationMessagesAsync(ParameterMessageReceiver param, IMessageReceiverWithResult messageReceiver, bool isDeleteNotificationAfterRead = false, CancellationToken cancellationToken = default)
{
var awsAccessKeyId = param.awsAccessKeyId;
var awsSecretAccessKey = param.awsSecretAccessKey;
Expand Down Expand Up @@ -157,22 +166,23 @@ public async Task StartReceivingNotificationMessagesAsync(ParameterMessageReceiv
}
}

private async Task ProcessAnyOfferChangedMessage(Message msg, IMessageReceiver messageReceiver, AmazonSQSClient amazonSQSClient, string SQS_URL, bool isDeleteNotificationAfterRead = true, CancellationToken cancellationToken = default)
private async Task ProcessAnyOfferChangedMessage(Message msg, IMessageReceiverWithResult messageReceiver, AmazonSQSClient amazonSQSClient, string SQS_URL, bool isDeleteNotificationAfterRead = true, CancellationToken cancellationToken = default)
{
var deleteMessage = isDeleteNotificationAfterRead;
try
{
var data = DeserializeNotification(msg);

messageReceiver.NewMessageRevicedTriger(data);
deleteMessage = messageReceiver.NewMessageRevicedTriger(data) || isDeleteNotificationAfterRead;

if (isDeleteNotificationAfterRead)
if (deleteMessage)
await DeleteMessageFromQueueAsync(amazonSQSClient, SQS_URL, msg.ReceiptHandle, cancellationToken);
}
catch (Exception ex)
{
messageReceiver.ErrorCatch(ex);

if (isDeleteNotificationAfterRead)
if (deleteMessage)
await DeleteMessageFromQueueAsync(amazonSQSClient, SQS_URL, msg.ReceiptHandle, cancellationToken);
}
}
Expand All @@ -194,5 +204,25 @@ private NotificationMessageResponce DeserializeNotification(Message message)
return notification;
}

private class MessageReceiverAdapter : IMessageReceiverWithResult
{
private readonly IMessageReceiver _receiver;

public MessageReceiverAdapter(IMessageReceiver receiver)
{
_receiver = receiver;
}

public void ErrorCatch(Exception ex)
{
_receiver.ErrorCatch(ex);
}

public bool NewMessageRevicedTriger(NotificationMessageResponce message)
{
_receiver.NewMessageRevicedTriger(message);
return false;
}
}
}
}
Loading