Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion events/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ const (
KinesisFirehoseTransformedStateProcessingFailed = "ProcessingFailed"
)

// KinesisFirehoseOTFOperation represents the operation to apply on the record during on-the-fly record routing.
type KinesisFirehoseOTFOperation string

const (
KinesisFirehoseOTFOperationInsert KinesisFirehoseOTFOperation = "insert"
KinesisFirehoseOTFOperationUpdate KinesisFirehoseOTFOperation = "update"
KinesisFirehoseOTFOperationDelete KinesisFirehoseOTFOperation = "delete"
)

type KinesisFirehoseResponse struct {
Records []KinesisFirehoseResponseRecord `json:"records"`
}
Expand All @@ -37,7 +46,14 @@ type KinesisFirehoseResponseRecord struct {
}

type KinesisFirehoseResponseRecordMetadata struct {
PartitionKeys map[string]string `json:"partitionKeys"`
PartitionKeys map[string]string `json:"partitionKeys"`
OTFMetadata KinesisFirehoseResponseRecordOTFMetadata `json:"otfMetadata"`
}

type KinesisFirehoseResponseRecordOTFMetadata struct {
DestinationDatabaseName string `json:"destinationDatabaseName"`
DestinationTableName string `json:"destinationTableName"`
Operation KinesisFirehoseOTFOperation `json:"operation"` // The Operation field must have one of the following values – insert, update, or delete.
}

type KinesisFirehoseRecordMetadata struct {
Expand Down
75 changes: 45 additions & 30 deletions events/testdata/kinesis-firehose-response.json
Original file line number Diff line number Diff line change
@@ -1,31 +1,46 @@
{
"records": [
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record1",
"result": "TRANSFORMED_STATE_OK",
"metadata": {
"partitionKeys": {}
}
},
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record2",
"result": "TRANSFORMED_STATE_DROPPED",
"metadata": {
"partitionKeys": {}
}
},
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record3",
"result": "TransformedStateOk",
"metadata": {
"partitionKeys": {
"iamKey1": "iamValue1",
"iamKey2": "iamValue2"
}
}
}
]
}
"records": [
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record1",
"result": "TRANSFORMED_STATE_OK",
"metadata": {
"partitionKeys": {},
"otfMetadata": {
"destinationTableName": "",
"destinationDatabaseName": "",
"operation": ""
}
}
},
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record2",
"result": "TRANSFORMED_STATE_DROPPED",
"metadata": {
"partitionKeys": {},
"otfMetadata": {
"destinationTableName": "",
"destinationDatabaseName": "",
"operation": ""
}
}
},
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record3",
"result": "TransformedStateOk",
"metadata": {
"partitionKeys": {
"iamKey1": "iamValue1",
"iamKey2": "iamValue2"
},
"otfMetadata": {
"destinationTableName": "table1",
"destinationDatabaseName": "database1",
"operation": "update"
}
}
}
]
}