-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add sqs notification plugin #115
Conversation
rzlim08
commented
Aug 14, 2023
- Adds an SQS queue and miniwdl plugin to send a message to the SQS queue after every step completes
- Runs on every step, regardless if it's a download step or not
- Sends the Workflow name, step name, and mocks an execution id right now
- Still in draft
@@ -46,6 +46,7 @@ RUN apt-get -q update && apt-get -q install -y \ | |||
# upgrade because of this issue https://github.com/chanzuckerberg/miniwdl/issues/607 in miniwdl | |||
RUN pip3 install importlib-metadata==4.13.0 | |||
RUN pip3 install miniwdl==${MINIWDL_VERSION} | |||
RUN pip3 install urllib3==1.26.16 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed for tests to pass
"TaskName": {"DataType": "String", "StringValue": run_id[-1]}, | ||
"ExecutionId": { | ||
"DataType": "String", | ||
"StringValue": "execution_id_to_be_passed_in", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Figure out how to actually get the execution id
logger = logger.getChild("s3_progressive_upload") | ||
|
||
# ignore inputs | ||
recv = yield recv |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to get notifications for either the inputs/command?
on completion of any task, upload its output files to S3, and record the S3 URI corresponding | ||
to each local file (keyed by inode) in _uploaded_files | ||
""" | ||
logger = logger.getChild("s3_progressive_upload") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO - Ryan: rename
# ignore inputs | ||
recv = yield recv | ||
|
||
yield recv |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to send a message when the workflow is finished?
// Sent to dead-letter queue after maxReceiveCount tries | ||
redrive_policy = lookup(each.value, "dead_letter", "true") == "true" ? jsonencode({ | ||
deadLetterTargetArn = aws_sqs_queue.sfn_notifications_queue_dead_letter[each.key].arn | ||
maxReceiveCount = 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO - Ryan: raise this value
@@ -44,7 +44,16 @@ module "batch_job" { | |||
docker_network = var.docker_network | |||
call_cache = var.call_cache | |||
output_status_json_files = var.output_status_json_files | |||
tags = var.tags | |||
sqs_queues = { | |||
"step" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass this into SWIPE?
@@ -15,6 +15,8 @@ module "swipetest" { | |||
"AWS_ENDPOINT_URL" : "http://awsnet:5000", | |||
"AWS_CONTAINER_CREDENTIALS_RELATIVE_URI" : "container-credentials-relative-uri", | |||
"S3PARCP_S3_URL" : "http://awsnet:5000", | |||
"AWS_STEP_NOTIFICATION_PLUGIN" : "http://localhost:9000/123456789012/swipe-test-step-sfn-notifications-queue" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably use a variable to set this
@@ -291,23 +301,48 @@ def tearDown(self) -> None: | |||
) | |||
self.test_bucket.delete() | |||
|
|||
def retrieve_message(self, url) -> str: | |||
""" Retrieve a single SQS message and delete it from queue""" | |||
resp = self.sqs.receive_message( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add attributes