Skip to content

Made PGConnection thread-safe. #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 61 additions & 22 deletions Sources/PerfectPostgreSQL/PerfectPostgreSQL.swift
Original file line number Diff line number Diff line change
Expand Up @@ -327,74 +327,100 @@ public final class PGResult {

/// connection management class
public final class PGConnection {

/// Connection Status enum
public enum StatusType {
case ok
case bad
}


// Take care that conn is not thread-safe.
var conn = OpaquePointer(bitPattern: 0)
var connectInfo: String = ""


// Acquire this lock before accessing conn.
// Need to be recursive to support transaction.
private var lock = NSRecursiveLock()

/// empty init
public init() {

}

deinit {
close()
}

/// Makes a new connection to the database server.
public func connectdb(_ info: String) -> StatusType {
lock.lock()
defer { lock.unlock() }

conn = PQconnectdb(info)
connectInfo = info
return status()
}

/// Close db connection
public func close() {
finish()
}

/// Closes the connection to the server. Also frees memory used by the PGconn object.
public func finish() {
lock.lock()
defer { lock.unlock() }

if conn != nil {
PQfinish(conn)
conn = OpaquePointer(bitPattern: 0)
}
}

/// Returns the status of the connection.
public func status() -> StatusType {
lock.lock()
defer { lock.unlock() }

let status = PQstatus(conn)
return status == CONNECTION_OK ? .ok : .bad
}

/// Returns the error message most recently generated by an operation on the connection.
public func errorMessage() -> String {
lock.lock()
defer { lock.unlock() }

return String(validatingUTF8: PQerrorMessage(conn)) ?? ""
}

/// Submits a command to the server and waits for the result.
public func exec(statement: String) -> PGResult {
lock.lock()
defer { lock.unlock() }

return PGResult(PQexec(conn, statement))
}

/// Sends data to the server during COPY_IN state.
public func putCopyData(data: String) {
lock.lock()
defer { lock.unlock() }

PQputCopyData(self.conn, data, Int32(data.count))
}

/// Sends end-of-data indication to the server during COPY_IN state.
/// If withError is set, the copy is forced to fail with the error description supplied.
public func putCopyEnd(withError: String? = nil) -> PGResult {
lock.lock()
defer { lock.unlock() }

PQputCopyEnd(self.conn, withError)
let result = PGResult(PQgetResult(self.conn))
return result
}

// !FIX! does not handle binary data
/// Submits a command to the server and waits for the result, with the ability to pass parameters separately from the SQL command text.
public func exec(statement: String, params: [Any?]) -> PGResult {
Expand Down Expand Up @@ -456,10 +482,14 @@ public final class PGConnection {
formats[idx] = 0
}
}

lock.lock()
defer { lock.unlock() }

let r = PQexecParams(conn, statement, Int32(count), nil, values, lengths, formats, Int32(0))
return PGResult(r)
}

/// Assert that the connection status is OK
///
/// - throws: If the connection status is bad
Expand Down Expand Up @@ -498,13 +528,16 @@ public final class PGConnection {
throw PostgreSQLError.error("Failed to execute statement. status: \(status)")
}
}

/// Executes a BEGIN, calls the provided closure and executes a ROLLBACK if an exception occurs or a COMMIT if no exception occurs.
///
/// - parameter closure: Block to be executed inside transaction
/// - throws: If the provided closure fails
/// - returns: If successful then the return value from the `closure`
public func doWithTransaction<Result>(closure: () throws -> Result) throws -> Result {
lock.lock()
defer { lock.unlock() }

try ensureStatusIsOk()
try execute(statement: "BEGIN")
do {
Expand All @@ -516,23 +549,26 @@ public final class PGConnection {
throw error
}
}

/// Handler for receiving a PGResult
public typealias ReceiverProc = (PGResult) -> Void

/// Handler for processing a text message
public typealias ProcessorProc = (String) -> Void

/// internal callback for notice receiving
internal var receiver: ReceiverProc = { _ in }

/// internal callback for notice processing
internal var processor: ProcessorProc = { _ in }

/// Set a new notice receiver
/// - parameter handler: a closure to handle the incoming notice
/// - returns: a C convention function pointer; would be nil if failed to set.
public func setReceiver(_ handler: @escaping ReceiverProc) -> PQnoticeReceiver? {
lock.lock()
defer { lock.unlock() }

guard let cn = self.conn else {
return nil
}
Expand All @@ -547,11 +583,14 @@ public final class PGConnection {
this.receiver(pgresult)
}, me)
}

/// Set a new notice processor
/// - parameter handler: a closure to handle the incoming notice
/// - returns: a C convention function pointer; would be nil if failed to set.
public func setProcessor(_ handler: @escaping ProcessorProc) -> PQnoticeProcessor?{
lock.lock()
defer { lock.unlock() }

guard let cn = self.conn else {
return nil
}
Expand Down