Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 37 additions & 18 deletions logger/fluent.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
#:cl-fluent-logger/logger/base)
(:import-from #:cl-fluent-logger/event-time
#:event-time)
(:import-from #:cl-fluent-logger/logger/fluent/socket
#:make-tcp-socket
#:make-unix-socket
#:socket-connected-p
#:socket-connect
#:socket-close
#:socket-stream)
(:import-from #:usocket)
(:import-from #:messagepack)
(:import-from #:chanl
Expand Down Expand Up @@ -39,6 +46,10 @@
:initarg :port
:initform 24224
:accessor fluent-logger-port)
(socket :type (or string pathname)
:initarg :socket
:initform nil
:accessor fluent-logger-socket)
(timeout :type number
:initarg :timeout
:initform 3.0
Expand All @@ -50,13 +61,19 @@

(connection-registry :initform (make-hash-table :test 'eq))))

(defmethod initialize-instance :before ((logger fluent-logger) &rest initargs &key host port socket &allow-other-keys)
(declare (ignore initargs))
(when (and (or host port) socket)
(error "Cannot specify :socket while :host or :port is specified")))

(defmethod initialize-instance :after ((logger fluent-logger) &rest initargs)
(declare (ignore initargs))
(with-slots (host port) logger
(unless host
(setf host "127.0.0.1"))
(unless port
(setf port 24224))))
(with-slots (host port socket) logger
(unless socket
(unless host
(setf host "127.0.0.1"))
(unless port
(setf port 24224)))))

(defun fluent-logger-connection (fluent-logger)
(with-slots (connection-registry) fluent-logger
Expand All @@ -71,21 +88,22 @@

(defmethod open-logger ((fluent-logger fluent-logger))
(let ((connection (fluent-logger-connection fluent-logger)))
(symbol-macrolet ((socket (fluent-connection-socket connection))
(socket-lock (fluent-connection-socket-lock connection))
(symbol-macrolet ((connection-socket (fluent-connection-socket connection))
(connection-socket-lock (fluent-connection-socket-lock connection))
(write-thread (fluent-connection-write-thread connection)))
(bt:with-recursive-lock-held (socket-lock)
(when socket
(bt:with-recursive-lock-held (connection-socket-lock)
(when connection-socket
(restart-case
(error "Socket is already opened.")
(close-logger ()
:report "Close the existing socket"
(close-logger fluent-logger))))
(with-slots (host port timeout) fluent-logger
(setf socket
(usocket:socket-connect host port
:element-type '(unsigned-byte 8)
:timeout timeout))))
(with-slots (host port socket timeout) fluent-logger
(setf connection-socket
(socket-connect
(if socket
(make-unix-socket :path socket :timeout timeout)
(make-tcp-socket :host host :port port :timeout timeout))))))
(when write-thread
(bt:destroy-thread write-thread))
(setf write-thread
Expand Down Expand Up @@ -116,7 +134,7 @@
(flush-buffer connection :infinite nil)
(bt:with-recursive-lock-held (socket-lock)
(ignore-errors
(usocket:socket-close socket))
(socket-close socket))
(setf socket nil))))
connection))

Expand Down Expand Up @@ -148,9 +166,10 @@
(buffer (fluent-connection-buffer connection)))
(handler-case
(progn
(unless socket
(unless (and socket
(socket-connected-p socket))
(error 'connection-not-established))
(let ((stream (usocket:socket-stream socket)))
(let ((stream (socket-stream socket)))
(loop
(when (and (not infinite)
(chanl:recv-blocks-p buffer))
Expand All @@ -166,7 +185,7 @@
(warn "Socket error: ~A" e)
(bt:with-lock-held (socket-lock)
(ignore-errors
(usocket:socket-close socket))
(socket-close socket))
(setf socket nil))

nil))))
Expand Down
72 changes: 72 additions & 0 deletions logger/fluent/socket.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
(defpackage #:cl-fluent-logger/logger/fluent/socket
(:use #:cl)
(:import-from #:usocket)
#+sbcl
(:import-from #:sb-bsd-sockets)
(:export #:socket
#:tcp-socket
#:unix-socket
#:make-tcp-socket
#:make-unix-socket
#:socket-connected-p
#:socket-connect
#:socket-close
#:socket-stream))
(in-package #:cl-fluent-logger/logger/fluent/socket)

(defstruct socket
handle
(timeout 3.0))

(defstruct (tcp-socket (:include socket)
(:conc-name socket-))
(host "127.0.0.1")
(port 24224))

(defstruct (unix-socket (:include socket)
(:conc-name socket-))
path)

(defun socket-connected-p (socket)
(not (null (socket-handle socket))))

(defgeneric socket-connect (socket)
(:method ((socket tcp-socket))
(setf (socket-handle socket)
(usocket:socket-connect (socket-host socket)
(socket-port socket)
:element-type '(unsigned-byte 8)
:timeout (socket-timeout socket)))
socket)
(:method ((socket unix-socket))
#-sbcl (error "Not supported Lisp to use UNIX domain socket")
#+sbcl
(let ((local-socket (make-instance 'sb-bsd-sockets:local-socket
:type :stream)))
(sb-bsd-sockets:socket-connect local-socket (socket-path socket))
(setf (socket-handle socket) local-socket))
socket))

(defgeneric socket-close (socket)
(:method ((socket tcp-socket))
(usocket:socket-close (socket-handle socket))
(setf (socket-handle socket) nil)
socket)
(:method ((socket unix-socket))
#-sbcl (error "Not supported Lisp to use UNIX domain socket")
#+sbcl
(sb-bsd-sockets:socket-close (socket-handle socket))
(setf (socket-handle socket) nil)
socket))

(defgeneric socket-stream (socket)
(:method ((socket tcp-socket))
(usocket:socket-stream (socket-handle socket)))
(:method ((socket unix-socket))
#-sbcl (error "Not supported Lisp to use UNIX domain socket")
#+sbcl
(sb-bsd-sockets:socket-make-stream (socket-handle socket)
:output t
:element-type '(unsigned-byte 8)
:timeout (socket-timeout socket)
:buffering :none)))