Skip to content

Use correct safe collection concurrency primatives. #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .swiftformat
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Swift version
--swiftversion 5.8
--swiftversion 6.1

# file options
--exclude .build
Expand Down
24 changes: 3 additions & 21 deletions Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ let package = Package(
.package(url: "https://github.com/awslabs/aws-sdk-swift.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-metrics.git", "1.0.0" ..< "3.0.0"),
.package(url: "https://github.com/JohnSundell/CollectionConcurrencyKit", from: "0.2.0"),
.package(url: "https://github.com/nicklockwood/SwiftFormat", from: "0.53.9"),
.package(url: "https://github.com/apple/swift-syntax", from: "601.0.0"),
.package(url: "https://github.com/tachyonics/smockable", from: "0.3.0"),
.package(url: "https://github.com/tachyonics/smockable", from: "0.4.0"),
],
targets: [
.macro(name: "DynamoDBTablesMacros", dependencies: [
Expand All @@ -59,7 +57,6 @@ let package = Package(
.product(name: "Logging", package: "swift-log"),
.product(name: "Metrics", package: "swift-metrics"),
.product(name: "AWSDynamoDB", package: "aws-sdk-swift"),
.product(name: "CollectionConcurrencyKit", package: "CollectionConcurrencyKit"),
.product(name: "Smockable", package: "smockable"),
],
swiftSettings: swiftSettings),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ enum AttributeDifference: Equatable {
var path: String {
switch self {
case .update(path: let path, value: _):
return path
path
case let .remove(path: path):
return path
path
case .listAppend(path: let path, value: _):
return path
path
}
}
}
Expand Down Expand Up @@ -241,17 +241,17 @@ extension GenericAWSDynamoDBCompositePrimaryKeyTable {

private func combinePath(basePath: String?, newComponent: String) -> String {
if let basePath {
return "\(basePath).\"\(newComponent)\""
"\(basePath).\"\(newComponent)\""
} else {
return "\"\(newComponent)\""
"\"\(newComponent)\""
}
}

private func updateAttribute(newPath: String, attribute: DynamoDBClientTypes.AttributeValue) throws -> [AttributeDifference] {
if let newValue = try getFlattenedAttribute(attribute: attribute) {
return [.update(path: newPath, value: newValue)]
[.update(path: newPath, value: newValue)]
} else {
return [.remove(path: newPath)]
[.remove(path: newPath)]
}
}

Expand Down Expand Up @@ -310,9 +310,9 @@ extension GenericAWSDynamoDBCompositePrimaryKeyTable {
/// single quote by doubling it. E.g. 'foo'bar' becomes 'foo''bar'.
private func sanitizeString(_ string: String) -> String {
if self.tableConfiguration.escapeSingleQuoteInPartiQL {
return string.replacingOccurrences(of: "'", with: "''")
string.replacingOccurrences(of: "'", with: "''")
} else {
return string
string
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,16 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
{
let attributesFilterString = attributesFilter?.joined(separator: ", ") ?? "*"

let partitionWhereClause: String
if partitionKeys.count == 1 {
partitionWhereClause = "\(partitionKeyAttributeName)='\(partitionKeys[0])'"
let partitionWhereClause = if partitionKeys.count == 1 {
"\(partitionKeyAttributeName)='\(partitionKeys[0])'"
} else {
partitionWhereClause = "\(partitionKeyAttributeName) IN ['\(partitionKeys.joined(separator: "', '"))']"
"\(partitionKeyAttributeName) IN ['\(partitionKeys.joined(separator: "', '"))']"
}

let whereClausePostfix: String
if let additionalWhereClause {
whereClausePostfix = " \(additionalWhereClause)"
let whereClausePostfix = if let additionalWhereClause {
" \(additionalWhereClause)"
} else {
whereClausePostfix = ""
""
}

return """
Expand Down Expand Up @@ -125,7 +123,7 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
nextToken: nil)
}

return itemLists.flatMap { $0 }
return itemLists.flatMap(\.self)
}

func execute<AttributesType, ItemType, TimeToLiveAttributesType>(
Expand Down Expand Up @@ -193,7 +191,7 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
nextToken: nil)
}

return itemLists.flatMap { $0 }
return itemLists.flatMap(\.self)
}

// function to return a future with the results of an execute call and all future paginated calls
Expand Down Expand Up @@ -320,7 +318,7 @@ extension [DynamoDBTableError] {

// iterate through all errors
return self.compactMap { error in
return switch error {
switch error {
case .accessDenied:
canPassThrough(state: &seenAccessDenied) ? error : nil
case .internalServerError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
monitors the unprocessed items returned in the response from DynamoDB and uses an exponential backoff algorithm to retry those items using
the same retry configuration as the underlying DynamoDB client.
*/
private class GetItemsRetriable<AttributesType: PrimaryKeyAttributes, ItemType: Codable, TimeToLiveAttributesType: TimeToLiveAttributes, DynamoClient: DynamoDBClientProtocol> {
private class GetItemsRetriable<AttributesType: PrimaryKeyAttributes, ItemType: Codable & Sendable, TimeToLiveAttributesType: TimeToLiveAttributes, DynamoClient: DynamoDBClientProtocol> {
typealias OutputType = [CompositePrimaryKey<AttributesType>: TypedTTLDatabaseItem<AttributesType, ItemType, TimeToLiveAttributesType>]

let dynamodb: DynamoClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
//

import AWSDynamoDB
import CollectionConcurrencyKit
import Foundation
import Logging

Expand All @@ -38,7 +37,7 @@ public enum AWSDynamoDBLimits {
public static let maxStatementLength = 8192
}

private struct AWSDynamoDBPolymorphicWriteEntryTransform<Client: DynamoDBClientProtocol>: PolymorphicWriteEntryTransform {
private struct AWSDynamoDBPolymorphicWriteEntryTransform<Client: DynamoDBClientProtocol & Sendable>: PolymorphicWriteEntryTransform {
typealias TableType = GenericAWSDynamoDBCompositePrimaryKeyTable<Client>

let statement: String
Expand All @@ -48,12 +47,12 @@ private struct AWSDynamoDBPolymorphicWriteEntryTransform<Client: DynamoDBClientP
}
}

private struct AWSDynamoDBPolymorphicTransactionConstraintTransform<Client: DynamoDBClientProtocol>: PolymorphicTransactionConstraintTransform {
private struct AWSDynamoDBPolymorphicTransactionConstraintTransform<Client: DynamoDBClientProtocol & Sendable>: PolymorphicTransactionConstraintTransform {
typealias TableType = GenericAWSDynamoDBCompositePrimaryKeyTable<Client>

let statement: String

init(_ entry: TransactionConstraintEntry<some PrimaryKeyAttributes, some Codable, some TimeToLiveAttributes>,
init(_ entry: TransactionConstraintEntry<some PrimaryKeyAttributes, some Codable & Sendable, some TimeToLiveAttributes>,
table: TableType) throws
{
self.statement = try table.entryToStatement(entry)
Expand All @@ -75,34 +74,32 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
internal func entryToStatement(
_ entry: WriteEntry<some Any, some Any, some Any>) throws -> String
{
let statement: String
switch entry {
let statement: String = switch entry {
case let .update(new: new, existing: existing):
statement = try getUpdateExpression(tableName: self.targetTableName,
newItem: new,
existingItem: existing)
try getUpdateExpression(tableName: self.targetTableName,
newItem: new,
existingItem: existing)
case let .insert(new: new):
statement = try getInsertExpression(tableName: self.targetTableName,
newItem: new)
try getInsertExpression(tableName: self.targetTableName,
newItem: new)
case let .deleteAtKey(key: key):
statement = try getDeleteExpression(tableName: self.targetTableName,
existingKey: key)
try getDeleteExpression(tableName: self.targetTableName,
existingKey: key)
case let .deleteItem(existing: existing):
statement = try getDeleteExpression(tableName: self.targetTableName,
existingItem: existing)
try getDeleteExpression(tableName: self.targetTableName,
existingItem: existing)
}

return statement
}

internal func entryToStatement(
_ entry: TransactionConstraintEntry<some Any, some Any, some Any>) throws -> String
_ entry: TransactionConstraintEntry<some Any, some Sendable, some Any>) throws -> String
{
let statement: String
switch entry {
let statement: String = switch entry {
case let .required(existing: existing):
statement = getExistsExpression(tableName: self.targetTableName,
existingItem: existing)
getExistsExpression(tableName: self.targetTableName,
existingItem: existing)
}

return statement
Expand Down Expand Up @@ -236,11 +233,10 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {

var isTransactionConflict = false
let reasons = try zip(cancellationReasons, keys).compactMap { cancellationReason, entryKey -> DynamoDBTableError? in
let key: CompositePrimaryKey<AttributesType>?
if let item = cancellationReason.item {
key = try DynamoDBDecoder().decode(.m(item))
let key: CompositePrimaryKey<AttributesType>? = if let item = cancellationReason.item {
try DynamoDBDecoder().decode(.m(item))
} else {
key = nil
nil
}

let partitionKey = key?.partitionKey ?? entryKey.partitionKey
Expand Down Expand Up @@ -339,11 +335,10 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {

var isTransactionConflict = false
let reasons = try zip(cancellationReasons, inputKeys).compactMap { cancellationReason, entryKey -> DynamoDBTableError? in
let key: CompositePrimaryKey<AttributesType>?
if let item = cancellationReason.item {
key = try DynamoDBDecoder().decode(.m(item))
let key: CompositePrimaryKey<AttributesType>? = if let item = cancellationReason.item {
try DynamoDBDecoder().decode(.m(item))
} else {
key = nil
nil
}

let partitionKey = key?.partitionKey ?? entryKey.partitionKey
Expand Down Expand Up @@ -469,21 +464,20 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
}

let statements = try entries.map { entry -> DynamoDBClientTypes.BatchStatementRequest in
let statement: String
switch entry {
let statement: String = switch entry {
case let .update(new: new, existing: existing):
statement = try getUpdateExpression(tableName: self.targetTableName,
newItem: new,
existingItem: existing)
try getUpdateExpression(tableName: self.targetTableName,
newItem: new,
existingItem: existing)
case let .insert(new: new):
statement = try getInsertExpression(tableName: self.targetTableName,
newItem: new)
try getInsertExpression(tableName: self.targetTableName,
newItem: new)
case let .deleteAtKey(key: key):
statement = try getDeleteExpression(tableName: self.targetTableName,
existingKey: key)
try getDeleteExpression(tableName: self.targetTableName,
existingKey: key)
case let .deleteItem(existing: existing):
statement = try getDeleteExpression(tableName: self.targetTableName,
existingItem: existing)
try getDeleteExpression(tableName: self.targetTableName,
existingItem: existing)
}

return DynamoDBClientTypes.BatchStatementRequest(consistentRead: self.tableConfiguration.consistentRead, statement: statement)
Expand Down Expand Up @@ -515,15 +509,13 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
}
}

func bulkWriteWithFallback<AttributesType, ItemType, TimeToLiveAttributesType>(
func bulkWriteWithFallback<AttributesType, ItemType: Sendable, TimeToLiveAttributesType>(
_ entries: [WriteEntry<AttributesType, ItemType, TimeToLiveAttributesType>]) async throws
{
// fall back to singel operation if the write entry exceeds the statement length limitation
var bulkWriteEntries: [WriteEntry<AttributesType, ItemType, TimeToLiveAttributesType>] = []
let errors: [DynamoDBTableError] = try await entries.concurrentCompactMap { entry -> DynamoDBTableError? in
// fall back to single operation if the write entry exceeds the statement length limitation
let results: [Result<WriteEntry<AttributesType, ItemType, TimeToLiveAttributesType>, DynamoDBTableError>] = try await entries.concurrentMap { entry in
do {
try self.validateEntry(entry: entry)
bulkWriteEntries.append(entry)
} catch DynamoDBTableError.statementLengthExceeded {
do {
switch entry {
Expand All @@ -537,11 +529,22 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
try await self.deleteItem(existingItem: existing)
}
} catch let error as DynamoDBTableError {
return error
return .failure(error)
}
}

return nil
return .success(entry)
}

var bulkWriteEntries: [WriteEntry<AttributesType, ItemType, TimeToLiveAttributesType>] = []
var errors: [DynamoDBTableError] = []
for result in results {
switch result {
case let .success(entry):
bulkWriteEntries.append(entry)
case let .failure(error):
errors.append(error)
}
}

do {
Expand Down
Loading
Loading