diff --git a/internal/libooniengine/engine.go b/internal/libooniengine/engine.go deleted file mode 100644 index 3a7fd496bb..0000000000 --- a/internal/libooniengine/engine.go +++ /dev/null @@ -1,30 +0,0 @@ -package main - -// -// C API -// - -//#include -// -//#include "engine.h" -import "C" - -import ( - "unsafe" - - "github.com/ooni/probe-cli/v3/internal/version" -) - -//export OONIEngineVersion -func OONIEngineVersion() *C.char { - return C.CString(version.Version) -} - -//export OONIEngineFreeMemory -func OONIEngineFreeMemory(ptr *C.void) { - C.free(unsafe.Pointer(ptr)) -} - -func main() { - // do nothing -} diff --git a/internal/libooniengine/engine.h b/internal/libooniengine/engine.h index 90f1b560e9..66c417bfdd 100644 --- a/internal/libooniengine/engine.h +++ b/internal/libooniengine/engine.h @@ -7,6 +7,14 @@ /// C API for using the OONI engine. /// +#include + +/// OONITask is an asynchronous thread of execution managed by the OONI +/// engine that performs a background operation and emits interim outpus +/// like logs and progress and results of the operation with meaningful +/// events such as, for example, the results of measurements. +typedef intptr_t OONITask; + #ifdef __cplusplus extern "C" { #endif @@ -19,7 +27,90 @@ char *OONIEngineVersion(void); /// OONIEngineFreeMemory frees the memory allocated by the engine. /// /// @param ptr a void pointer refering to the memory to be freed. -void OONIENgineFreeMemory(void *ptr); +void OONIEngineFreeMemory(void *ptr); + +/// OONIEngineCall starts a new OONITask using the given @p req. +/// +/// @param req A JSON string, owned by the caller, that contains +/// the configuration for the task to start. +/// +/// @return Zero on failure, nonzero on success. If the return value +/// is nonzero, a task is running. In such a case, the caller is +/// responsible to eventually dispose of the task using OONIEngineFreeTask. +OONITask OONIEngineCall(char *req); + +/// OONIEngineWaitForNextEvent awaits on the @p task event queue until +/// a new event is available or the given @p timeout expires. +/// +/// @param task Task handle returned by OONIEngineCall. +/// +/// @param timeout Timeout in milliseconds. If the timeout is zero +/// or negative, this function would potentially block forever. +/// +/// @return A NULL pointer on failure, non-NULL JSON string otherwise. +/// If the return value is non-NULL, the caller takes ownership of the +/// char pointer and MUST free it using OONIEngineFreeMemory when done +/// using it. +/// +/// This function will return a NULL pointer: +/// +/// 1. when the timeout expires; +/// +/// 2. if @p task is done; +/// +/// 3. if @p task is zero or does not refer to a valid task; +/// +/// 4. if we cannot JSON serialize the message; +/// +/// 5. possibly because of other unknown internal errors. +/// +/// In short, you cannot reliably determine whether a task is done by +/// checking whether this function has returned an empty string. +char *OONIEngineWaitForNextEvent(OONITask task, int32_t timeout); + +/// OONIEngineTaskGetResult awaits on the result queue until the final +/// result is available. +/// +/// @param task Task handle returned by OONIEngineCall. +/// +/// @return A NULL pointer on failure, non-NULL JSON string otherwise. +/// If the return value is non-NULL, the caller takes ownership of the +/// char pointer and MUST free it using OONIEngineFreeMemory when done +/// using it. +/// +/// This function will return a NULL pointer: +/// +/// 1. if @p task is zero or does not refer to a valid task; +/// +/// 2. if we cannot JSON serialize the message; +/// +/// 3. possibly because of other unknown internal errors. +/// +/// In short, you cannot reliably determine whether a task is done by +/// checking whether this function has returned an empty string. +char *OONIEngineTaskGetResult(OONITask task); + +/// OONIEngineTaskIsDone returns whether @p task is done. A task is +/// done when it has finished running _and_ its events queue has been drained. +/// +/// @param task Task handle returned by OONIEngineCall. +/// +/// @return Nonzero if the task exists and either is still running or has some +/// unread events inside its events queue, zero otherwise. +uint8_t OONIEngineTaskIsDone(OONITask task); + +/// OONIEngineInterruptTask tells @p task to stop ASAP. +/// +/// @param task Task handle returned by OONIEngineCall. If task is zero +/// or does not refer to a valid task, this function will just do nothing. +void OONIEngineInterruptTask(OONITask task); + +/// OONIEngineFreeTask frees the memory associated with @p task. If the task is +/// still running, this function will also interrupt it. +/// +/// @param task Task handle returned by OONIEngineCall. If task is zero +/// or does not refer to a valid task, this function will just do nothing. +void OONIEngineFreeTask(OONITask task); #ifdef __cplusplus } diff --git a/internal/libooniengine/handle.go b/internal/libooniengine/handle.go new file mode 100644 index 0000000000..12cc88d16e --- /dev/null +++ b/internal/libooniengine/handle.go @@ -0,0 +1,90 @@ +package main + +// +// Handle mimics cgo.Handle but uses a intptr +// + +//#include +// +//#include "engine.h" +import "C" + +import ( + "errors" + "log" + "sync" + + "github.com/ooni/probe-cli/v3/internal/motor" +) + +var ( + handler Handler + + // errHandleMisuse indicates that an invalid handle was misused + errHandleMisuse = errors.New("misuse of a invalid handle") + + // errHandleSpaceExceeded + errHandleSpaceExceeded = errors.New("ran out of handle space") +) + +func init() { + handler = Handler{ + handles: make(map[Handle]interface{}), + } +} + +type Handle C.intptr_t + +// Handler handles the entirety of handle operations. +type Handler struct { + handles map[Handle]interface{} + handleIdx Handle + mu sync.Mutex +} + +// newHandle returns a handle for a given value +// if we run out of handle space, a zero handle is returned. +func (h *Handler) newHandle(v any) (Handle, error) { + h.mu.Lock() + defer h.mu.Unlock() + ptr := C.intptr_t(h.handleIdx) + newId := ptr + 1 + if newId < 0 { + return Handle(0), errHandleSpaceExceeded + } + h.handleIdx = Handle(newId) + h.handles[h.handleIdx] = v + return h.handleIdx, nil +} + +// delete invalidates a handle +func (h *Handler) delete(hd Handle) { + h.mu.Lock() + defer h.mu.Unlock() + delete(h.handles, hd) +} + +// value returns the associated go value for a valid handle +func (h *Handler) value(hd Handle) (any, error) { + v, ok := h.handles[hd] + if !ok { + return nil, errHandleMisuse + } + return v, nil +} + +// getTaskHandle checks if the task handle is valid and returns the corresponding TaskAPI. +func (h *Handler) getTaskHandle(task C.OONITask) (tp motor.TaskAPI) { + hd := Handle(task) + val, err := h.value(hd) + if err != nil { + log.Printf("getTaskHandle: %s", err.Error()) + return + } + tp, ok := val.(motor.TaskAPI) + if !ok { + log.Printf("getTaskHandle: invalid type assertion") + return + } + return +} diff --git a/internal/libooniengine/main.go b/internal/libooniengine/main.go new file mode 100644 index 0000000000..ef40808be8 --- /dev/null +++ b/internal/libooniengine/main.go @@ -0,0 +1,136 @@ +package main + +// +// C API +// + +//#include +// +//#include "engine.h" +import "C" + +import ( + "encoding/json" + "log" + "time" + "unsafe" + + "github.com/ooni/probe-cli/v3/internal/motor" + "github.com/ooni/probe-cli/v3/internal/version" +) + +const ( + // invalidTaskHandle represents the invalid task handle. + invalidTaskHandle = 0 +) + +// parse converts a JSON request string to the concrete Go type. +func parse(req *C.char) (*motor.Request, error) { + out := &motor.Request{} + if err := json.Unmarshal([]byte(C.GoString(req)), &out); err != nil { + return nil, err + } + return out, nil +} + +// serialize serializes a OONI response to a JSON string accessible to C code. +func serialize(resp *motor.Response) *C.char { + if resp == nil { + return nil + } + out, err := json.Marshal(resp) + if err != nil { + log.Printf("serializeMessage: cannot serialize message: %s", err.Error()) + return nil + } + return C.CString(string(out)) +} + +//export OONIEngineVersion +func OONIEngineVersion() *C.char { + return C.CString(version.Version) +} + +//export OONIEngineFreeMemory +func OONIEngineFreeMemory(ptr *C.void) { + C.free(unsafe.Pointer(ptr)) +} + +//export OONIEngineCall +func OONIEngineCall(req *C.char) C.OONITask { + r, err := parse(req) + if err != nil { + log.Printf("OONIEngineCall: %s", err.Error()) + return invalidTaskHandle + } + tp := motor.StartTask(r) + if tp == nil { + log.Printf("OONITaskStart: startTask returned NULL") + return invalidTaskHandle + } + handle, err := handler.newHandle(tp) + if err != nil { + log.Printf("OONITaskStart: %s", err.Error()) + return invalidTaskHandle + } + return C.OONITask(handle) +} + +//export OONIEngineWaitForNextEvent +func OONIEngineWaitForNextEvent(task C.OONITask, timeout C.int32_t) *C.char { + tp := handler.getTaskHandle(task) + if tp == nil { + return nil + } + var ev *motor.Response + if timeout <= 0 { + ev = tp.WaitForNextEvent(time.Duration(timeout)) + } else { + ev = tp.WaitForNextEvent(time.Duration(timeout) * time.Millisecond) + } + return serialize(ev) +} + +//export OONIEngineTaskGetResult +func OONIEngineTaskGetResult(task C.OONITask) *C.char { + tp := handler.getTaskHandle(task) + if tp == nil { + return nil + } + result := tp.Result() + return serialize(result) +} + +//export OONIEngineTaskIsDone +func OONIEngineTaskIsDone(task C.OONITask) (out C.uint8_t) { + tp := handler.getTaskHandle(task) + if tp == nil { + return + } + if !tp.IsDone() { + out++ + } + return +} + +//export OONIEngineInterruptTask +func OONIEngineInterruptTask(task C.OONITask) { + tp := handler.getTaskHandle(task) + if tp == nil { + return + } + tp.Interrupt() +} + +//export OONIEngineFreeTask +func OONIEngineFreeTask(task C.OONITask) { + tp := handler.getTaskHandle(task) + if tp != nil { + tp.Interrupt() + } + handler.delete(Handle(task)) +} + +func main() { + // do nothing +} diff --git a/internal/motor/emitter.go b/internal/motor/emitter.go new file mode 100644 index 0000000000..5af08c554f --- /dev/null +++ b/internal/motor/emitter.go @@ -0,0 +1,21 @@ +package motor + +// +// Emitter +// + +// taskChanEmitter implements taskMaybeEmitter. +type taskChanEmitter struct { + // out is the channel where we emit events. + out chan *Response +} + +var _ taskMaybeEmitter = &taskChanEmitter{} + +// maybeEmitEvent implements taskMaybeEmitter.maybeEmitEvent. +func (e *taskChanEmitter) maybeEmitEvent(resp *Response) { + select { + case e.out <- resp: + default: // buffer full, discard this event + } +} diff --git a/internal/motor/logger.go b/internal/motor/logger.go new file mode 100644 index 0000000000..9c2a04ea6a --- /dev/null +++ b/internal/motor/logger.go @@ -0,0 +1,91 @@ +package motor + +import ( + "fmt" + + "github.com/ooni/probe-cli/v3/internal/model" +) + +type LogLevel string + +const ( + // The DEBUG log level. + logDebug LogLevel = "DEBUG" + + // The INFO log level. + logInfo LogLevel = "INFO" + + // The WARNING log level. + logWarning LogLevel = "WARNING" +) + +// LogResponse is the response for any logging task. +type LogResponse struct { + Level LogLevel `json:",omitempty"` + Message string `json:",omitempty"` +} + +// taskLogger implements model.Logger for tasks. +type taskLogger struct { + // emitter is used to emit log events. + emitter taskMaybeEmitter + + // verbose indicates whether verbose logging is enabled. + verbose bool +} + +// newLogger creates a new taskLogger instance using +// the emitter to emit log events. +func newTaskLogger(emitter taskMaybeEmitter, verbose bool) *taskLogger { + return &taskLogger{ + emitter: emitter, + verbose: verbose, + } +} + +var _ model.Logger = &taskLogger{} + +// Debugf implements model.Logger.Debugf. +func (tl *taskLogger) Debugf(format string, values ...any) { + if tl.verbose { + tl.emit(logDebug, fmt.Sprintf(format, values...)) + } +} + +// Debug implements model.Logger.Debug. +func (tl *taskLogger) Debug(message string) { + if tl.verbose { + tl.emit(logDebug, message) + } +} + +// Infof implements model.Logger.Infof. +func (tl *taskLogger) Infof(format string, values ...any) { + tl.emit(logInfo, fmt.Sprintf(format, values...)) +} + +// Info implements model.Logger.Info. +func (tl *taskLogger) Info(message string) { + tl.emit(logInfo, message) +} + +// Warnf implements model.Logger.Warnf. +func (tl *taskLogger) Warnf(format string, values ...any) { + tl.emit(logWarning, fmt.Sprintf(format, values...)) +} + +// Warn implements model.Logger.Warn. +func (tl *taskLogger) Warn(message string) { + tl.emit(logWarning, message) +} + +// emit emits a log message. +func (tl *taskLogger) emit(level LogLevel, message string) { + logResp := &Response{ + Logger: LogResponse{ + Level: level, + Message: message, + }, + } + tl.emitter.maybeEmitEvent(logResp) +} diff --git a/internal/motor/model.go b/internal/motor/model.go new file mode 100644 index 0000000000..ad748455b3 --- /dev/null +++ b/internal/motor/model.go @@ -0,0 +1,69 @@ +package motor + +import ( + "context" + "encoding/json" + "time" +) + +// request is the OONI request containing task name and arguments. +type Request struct { + Name string `json:",omitempty"` + Arguments json.RawMessage `json:",omitempty"` +} + +// response is the OONI response to serialize before sending. +type Response struct { + Logger LogResponse `json:",omitempty"` + Test testResponse `json:",omitempty"` + Error string `json:",omitempty"` +} + +// taskEventsBuffer is the buffer used for the task's event chan, which +// should guarantee enough buffering when the application is slow. +const taskEventsBuffer = 1024 + +// taskMaybeEmitter emits events, if possible. We use a buffered +// channel with a large buffer for collecting task events. We expect +// the application to always be able to drain the channel timely. Yet, +// if that's not the case, it is fine to discard events. This data +// type implement such a discard-if-reader is slow behaviour. +type taskMaybeEmitter interface { + // maybeEmitEvent emits an event if there's available buffer in the + // output channel and otherwise discards the event. + maybeEmitEvent(resp *Response) +} + +// taskRunner runs a given task. Any task that you can run from +// the application must implement this interface. +type taskRunner interface { + // Main runs the task to completion. + // + // Arguments: + // + // - ctx is the context for deadline/cancellation/timeout; + // + // - emitter is the emitter to emit events; + // + // - req is the parsed request containing task specific arguments. + main(ctx context.Context, emitter taskMaybeEmitter, req *Request, resp *Response) +} + +// taskAPI implements the OONI engine C API functions. We use this interface +// to enable easier testing of the code that manages the tasks lifecycle. +type TaskAPI interface { + // waitForNextEvent implements OONITaskWaitForNextEvent. + WaitForNextEvent(timeout time.Duration) *Response + + // GetResult implements OONITaskGetResult + Result() *Response + + // isDone implements OONITaskIsDone. + IsDone() bool + + // interrupt implements OONITaskInterrupt. + Interrupt() +} + +// taskRegistry maps each task name to its implementation. +var taskRegistry = map[string]taskRunner{} diff --git a/internal/motor/task.go b/internal/motor/task.go new file mode 100644 index 0000000000..c4d214a152 --- /dev/null +++ b/internal/motor/task.go @@ -0,0 +1,113 @@ +package motor + +import ( + "context" + "errors" + "log" + "sync/atomic" + "time" +) + +var ( + errInvalidRequest = errors.New("input request has no valid task name") +) + +// startTask starts a given task. +func StartTask(req *Request) TaskAPI { + ctx, cancel := context.WithCancel(context.Background()) + tp := &taskState{ + cancel: cancel, + done: &atomic.Int64{}, + events: make(chan *Response, taskEventsBuffer), + result: make(chan *Response, 1), + stopped: make(chan any), + } + go tp.main(ctx, req) + return tp +} + +// task implements taskAPI. +type taskState struct { + // cancel cancels the context used by this task. + cancel context.CancelFunc + + // done indicates that this task is done. + done *atomic.Int64 + + // events is the channel where we emit task events. + events chan *Response + + // result is the channel where we emit the final result. + result chan *Response + + // stopped indicates that the task is done. + stopped chan any +} + +var _ TaskAPI = &taskState{} + +// WaitForNextEvent implements TaskAPI.WaitForNextEvent. +func (tp *taskState) WaitForNextEvent(timeout time.Duration) *Response { + // Implementation note: we don't need to log any of these nil-returning conditions + // as they are not exceptional, rather they're part of normal usage. + ctx, cancel := contextForWaitForNextEvent(timeout) + defer cancel() + select { + case <-ctx.Done(): + return nil // timeout while blocking for reading + case ev := <-tp.events: + return ev // ordinary chan reading + case <-tp.stopped: + select { + case ev := <-tp.events: + return ev // still draining the chan + default: + tp.done.Add(1) // fully drained so we can flip "done" now + return nil + } + } +} + +// Result implements TaskAPI.Result +func (tp *taskState) Result() *Response { + return <-tp.result +} + +// contextForWaitForNextEvent returns the suitable context +// for making the waitForNextEvent function time bounded. +func contextForWaitForNextEvent(timeout time.Duration) (context.Context, context.CancelFunc) { + ctx := context.Background() + if timeout < 0 { + return context.WithCancel(ctx) + } + return context.WithTimeout(ctx, timeout) +} + +// IsDone implements TaskAPI.IsDone. +func (tp *taskState) IsDone() bool { + return tp.done.Load() > 0 +} + +// Interrupt implements TaskAPI.Interrupt. +func (tp *taskState) Interrupt() { + tp.cancel() +} + +// main is the main function of the task. +func (tp *taskState) main(ctx context.Context, req *Request) { + defer close(tp.stopped) // synchronize with caller + taskName := req.Name + resp := &Response{} + runner := taskRegistry[taskName] + if runner == nil { + log.Printf("OONITaskStart: unknown task name: %s", taskName) + resp.Error = errInvalidRequest.Error() + tp.result <- resp + return + } + emitter := &taskChanEmitter{ + out: tp.events, + } + runner.main(ctx, emitter, req, resp) + tp.result <- resp // emit response to result channel +} diff --git a/internal/motor/test.go b/internal/motor/test.go new file mode 100644 index 0000000000..a6b64be539 --- /dev/null +++ b/internal/motor/test.go @@ -0,0 +1,55 @@ +package motor + +// +// test is a mock task to mimic the request-response API for the FFI consumer. +// + +import ( + "context" + "encoding/json" + "errors" +) + +func init() { + taskRegistry["Test"] = &testTaskRunner{} +} + +var ( + errTestDisabled = errors.New("request argument for test disabled") + + errParseFailed = errors.New("unable to parse task arguments") +) + +// testOptions contains the request options for the Test task. +type testOptions struct { + Test bool `json:",omitempty"` +} + +// testResponse is the response for the Test task. +type testResponse struct { + Response string `json:",omitempty"` + Error string `json:"omitempty"` +} + +type testTaskRunner struct{} + +var _ taskRunner = &testTaskRunner{} + +// main implements taskRunner.main +func (tr *testTaskRunner) main(ctx context.Context, emitter taskMaybeEmitter, + req *Request, resp *Response) { + logger := newTaskLogger(emitter, false) + args := &testOptions{} + if err := json.Unmarshal(req.Arguments, args); err != nil { + logger.Warn("task_runner: %s") + resp.Test.Error = errParseFailed.Error() + return + } + if !args.Test { + logger.Warnf("task_runner: %s", errTestDisabled.Error()) + resp.Test.Error = errTestDisabled.Error() + return + } + logger.Info("task_runner: a log event for the Test task") + resp.Test.Response = "test success" +}