Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ jobs:
override: true
- uses: Swatinem/rust-cache@v2
- name: test
run: pushd ffi && cargo b --features default-engine && make test && popd
run: pushd ffi && cargo b --features default-engine,sync-engine && make test && popd
5 changes: 5 additions & 0 deletions ffi/.clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BasedOnStyle: Mozilla
ColumnLimit: 100
BreakBeforeBraces: Attach
AlwaysBreakAfterReturnType: None
AlwaysBreakAfterDefinitionReturnType: None
2 changes: 1 addition & 1 deletion ffi/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ SOURCES=cffi-test.c
INCPATHS=../target/ffi-headers
LIBPATHS=../target/debug
LDFLAGS=-ldelta_kernel_ffi #`pkg-config --libs arrow-glib`
CFLAGS=-c -Wall -DDEFINE_DEFAULT_ENGINE #`pkg-config --cflags arrow-glib`
CFLAGS=-c -Wall -DDEFINE_DEFAULT_ENGINE -DDEFINE_SYNC_ENGINE #`pkg-config --cflags arrow-glib`
CC=gcc

OBJECTS=$(SOURCES:.c=.o)
Expand Down
10 changes: 10 additions & 0 deletions ffi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@ table=../kernel/tests/data/table-without-dv-small make run


This will place libraries in the root `target` dir (`../target/[debug,release]` from the directory containing this README), and headers in `../target/ffi-headers`. In that directory there will be a `delta_kernel_ffi.h` file, which is the C header, and a `delta_kernel_ffi.hpp` which is the C++ header.

### C/C++ Extension (VSCode)

By default the VSCode C/C++ Extension does not use any defines flags. You can open `settings.json` and set the following line:
```
"C_Cpp.default.defines": [
"DEFINE_DEFAULT_ENGINE",
"DEFINE_SYNC_ENGINE"
]
```
66 changes: 41 additions & 25 deletions ffi/cffi-test.c
Original file line number Diff line number Diff line change
@@ -1,46 +1,39 @@
#include <inttypes.h>
#include <stdio.h>
#include <string.h>
#include <inttypes.h>

#include "delta_kernel_ffi.h"

void visit_callback(void* engine_context, KernelStringSlice path, int64_t size, const DvInfo *dv_info, const CStringMap *partition_values) {
void visit_callback(void* engine_context,
KernelStringSlice path,
int64_t size,
const DvInfo* dv_info,
const CStringMap* partition_values) {
printf("file: %.*s\n", (int)path.len, path.ptr);
}


void visit_data(void *engine_context, EngineData *engine_data, const KernelBoolSlice selection_vec) {
void visit_data(void* engine_context,
EngineData* engine_data,
const KernelBoolSlice selection_vec) {
visit_scan_data(engine_data, selection_vec, engine_context, visit_callback);
}

int main(int argc, char* argv[]) {

if (argc < 2) {
printf("Usage: %s table/path\n", argv[0]);
return -1;
}

char* table_path = argv[1];
printf("Reading table at %s\n", table_path);

KernelStringSlice table_path_slice = {table_path, strlen(table_path)};

ExternResultHandleSharedExternEngine engine_res =
get_default_engine(table_path_slice, NULL);
int test_engine(KernelStringSlice table_path_slice,
ExternResultHandleSharedExternEngine engine_res) {
if (engine_res.tag != OkHandleSharedExternEngine) {
printf("Failed to get engine\n");
return -1;
}

SharedExternEngine *engine = engine_res.ok;
SharedExternEngine* engine = engine_res.ok;

ExternResultHandleSharedSnapshot snapshot_res = snapshot(table_path_slice, engine);
if (snapshot_res.tag != OkHandleSharedSnapshot) {
printf("Failed to create snapshot\n");
return -1;
}

SharedSnapshot *snapshot = snapshot_res.ok;
SharedSnapshot* snapshot = snapshot_res.ok;

uint64_t v = version(snapshot);
printf("version: %" PRIu64 "\n", v);
Expand All @@ -50,16 +43,15 @@ int main(int argc, char* argv[]) {
return -1;
}

SharedScan *scan = scan_res.ok;
SharedScan* scan = scan_res.ok;

ExternResultHandleSharedScanDataIterator data_iter_res =
kernel_scan_data_init(engine, scan);
ExternResultHandleSharedScanDataIterator data_iter_res = kernel_scan_data_init(engine, scan);
if (data_iter_res.tag != OkHandleSharedScanDataIterator) {
printf("Failed to construct scan data iterator\n");
return -1;
}

SharedScanDataIterator *data_iter = data_iter_res.ok;
SharedScanDataIterator* data_iter = data_iter_res.ok;

// iterate scan files
for (;;) {
Expand All @@ -75,6 +67,30 @@ int main(int argc, char* argv[]) {
drop_scan(scan);
drop_snapshot(snapshot);
drop_engine(engine);

return 0;
}

int main(int argc, char* argv[]) {

if (argc < 2) {
printf("Usage: %s table/path\n", argv[0]);
return -1;
}

char* table_path = argv[1];
printf("Reading table at %s\n", table_path);

KernelStringSlice table_path_slice = { table_path, strlen(table_path) };

ExternResultHandleSharedExternEngine default_engine_res =
get_default_engine(table_path_slice, NULL);
ExternResultHandleSharedExternEngine sync_engine_res = get_sync_engine(NULL);

printf("Executing with default engine\n");
int default_test_res = test_engine(table_path_slice, default_engine_res);
printf("Executing with sync engine\n");
int sync_test_res = test_engine(table_path_slice, sync_engine_res);

// return 0 iff neither test passes
return default_test_res | sync_test_res;
}
42 changes: 36 additions & 6 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// FFI interface for the delta kernel
///
/// Exposes that an engine needs to call from C/C++ to interface with kernel
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
#[cfg(feature = "default-engine")]
use std::collections::HashMap;
use std::default::Default;
use std::os::raw::{c_char, c_void};
Expand Down Expand Up @@ -297,7 +297,7 @@ pub enum ExternResult<T> {
}

pub type AllocateErrorFn =
extern "C" fn(etype: KernelError, msg: KernelStringSlice) -> *mut EngineError;
Option<extern "C" fn(etype: KernelError, msg: KernelStringSlice) -> *mut EngineError>;

// NOTE: We can't "just" impl From<DeltaResult<T>> because we require an error allocator.
impl<T> ExternResult<T> {
Expand Down Expand Up @@ -334,7 +334,13 @@ impl AllocateError for AllocateErrorFn {
etype: KernelError,
msg: KernelStringSlice,
) -> *mut EngineError {
self(etype, msg)
match self {
Some(error_fn) => error_fn(etype, msg),
None => {
Comment thread
abrassel marked this conversation as resolved.
Outdated
let msg = unsafe { String::try_from_slice(msg) }.expect("invalid string slice");
panic!("{etype:?}: {msg}");
}
}
}
}

Expand Down Expand Up @@ -430,14 +436,14 @@ unsafe fn unwrap_and_parse_path_as_url(path: KernelStringSlice) -> DeltaResult<U
}

/// A builder that allows setting options on the `Engine` before actually building it
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
#[cfg(feature = "default-engine")]
pub struct EngineBuilder {
url: Url,
allocate_fn: AllocateErrorFn,
options: HashMap<String, String>,
}

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
#[cfg(feature = "default-engine")]
impl EngineBuilder {
fn set_option(&mut self, key: String, val: String) {
self.options.insert(key, val);
Expand Down Expand Up @@ -491,9 +497,10 @@ pub unsafe extern "C" fn set_builder_option(
builder.set_option(key.unwrap(), value.unwrap());
}

/// Consume the builder and return an engine. After calling, the passed pointer is _no
/// Consume the builder and return a `default` engine. After calling, the passed pointer is _no
/// longer valid_.
///
///
/// # Safety
///
/// Caller is responsible to pass a valid EngineBuilder pointer, and to not use it again afterwards
Expand Down Expand Up @@ -533,6 +540,17 @@ fn get_default_default_engine_impl(
get_default_engine_impl(url?, Default::default(), allocate_error)
}

/// # Safety
///
/// Caller is responsible for passing a valid path pointer.
#[cfg(feature = "sync-engine")]
#[no_mangle]
pub unsafe extern "C" fn get_sync_engine(
allocate_error: AllocateErrorFn,
) -> ExternResult<Handle<SharedExternEngine>> {
get_sync_engine_impl(allocate_error).into_extern_result(&allocate_error)
}

#[cfg(feature = "default-engine")]
fn get_default_engine_impl(
url: Url,
Expand All @@ -553,6 +571,18 @@ fn get_default_engine_impl(
Ok(engine.into())
}

#[cfg(feature = "sync-engine")]
fn get_sync_engine_impl(
allocate_error: AllocateErrorFn,
) -> DeltaResult<Handle<SharedExternEngine>> {
let engine = delta_kernel::engine::sync::SyncEngine::new();
let engine: Arc<dyn ExternEngine> = Arc::new(ExternEngineVtable {
engine: Arc::new(engine),
allocate_error,
});
Ok(engine.into())
}

/// # Safety
///
/// Caller is responsible for passing a valid handle.
Expand Down