Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ jobs:
run: |
cargo clean
# rmp-serde needs a feature activated, so we'll just run it separately.
cargo run --example 2>&1 | grep -E '^ ' | awk '!/rmp-serde/' | xargs -n1 cargo run --example
# We also want to run the nested-rtxns example in release.
cargo run --example 2>&1 | grep -E '^ ' | awk '!/rmp-serde|nested-rtxns/' | xargs -n1 cargo run --example
cargo run --example rmp-serde --features serde-rmp
cargo run --release --example nested-rtxns

heed3-examples:
name: Run the heed3 examples
Expand Down
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[submodule "lmdb-master-sys/lmdb"]
path = lmdb-master-sys/lmdb
url = https://git.openldap.org/openldap/openldap
branch = mdb.master
url = https://github.com/meilisearch/lmdb.git
branch = allow-nested-rtxn-from-wtxn
[submodule "lmdb-master3-sys/lmdb"]
path = lmdb-master3-sys/lmdb
url = https://git.openldap.org/openldap/openldap
Expand Down
10 changes: 8 additions & 2 deletions heed/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "heed"
version = "0.22.0"
version = "0.22.1-nested-rtxns-6"
authors = ["Kerollmops <[email protected]>"]
description = "A fully typed LMDB (mdb.master) wrapper with minimum overhead"
license = "MIT"
Expand All @@ -16,14 +16,17 @@ byteorder = { version = "1.5.0", default-features = false }
heed-traits = { version = "0.20.0", path = "../heed-traits" }
heed-types = { version = "0.21.0", default-features = false, path = "../heed-types" }
libc = "0.2.175"
lmdb-master-sys = { version = "0.2.5", path = "../lmdb-master-sys" }
lmdb-master-sys = { version = "0.2.6-nested-rtxns-6", path = "../lmdb-master-sys" }
once_cell = "1.21.3"
page_size = "0.6.0"
serde = { version = "1.0.223", features = ["derive"], optional = true }
synchronoise = "1.0.1"

[dev-dependencies]
memchr = "2.7.5"
rand = "0.9.0"
rayon = "1.10.0"
roaring = "0.10.10"
serde = { version = "1.0.223", features = ["derive"] }
tempfile = "3.22.0"

Expand Down Expand Up @@ -122,6 +125,9 @@ name = "custom-dupsort-comparator"
[[example]]
name = "multi-env"

[[example]]
name = "nested-rtxns"

[[example]]
name = "nested"

Expand Down
80 changes: 80 additions & 0 deletions heed/examples/nested-rtxns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use heed::types::*;
use heed::{Database, EnvFlags, EnvOpenOptions};
use rand::prelude::*;
use rayon::prelude::*;
use roaring::RoaringBitmap;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let dir = tempfile::tempdir()?;
let env = unsafe {
let mut options = EnvOpenOptions::new().read_txn_without_tls();
#[cfg(not(windows))]
options.flags(EnvFlags::WRITE_MAP);
options
.map_size(2 * 1024 * 1024 * 1024) // 2 GiB
.open(dir.path())?
};

// opening a write transaction
let mut wtxn = env.write_txn()?;
// we will open the default unnamed database
let db: Database<U32<byteorder::BigEndian>, Bytes> = env.create_database(&mut wtxn, None)?;

let mut buffer = Vec::new();
for i in 0..1000 {
let mut rng = StdRng::seed_from_u64(i as u64);
let max = rng.random_range(10_000..=100_000);
let roaring = RoaringBitmap::from_sorted_iter(0..max)?;
buffer.clear();
roaring.serialize_into(&mut buffer)?;
db.put(&mut wtxn, &i, &buffer)?;
}

// opening multiple read-only transactions
// to check if those values are now available
// without committing beforehand
let rtxns = (0..1000).map(|_| env.nested_read_txn(&wtxn)).collect::<heed::Result<Vec<_>>>()?;

rtxns.into_par_iter().enumerate().for_each(|(i, rtxn)| {
let mut rng = StdRng::seed_from_u64(i as u64);
let max = rng.random_range(10_000..=100_000);
let roaring = RoaringBitmap::from_sorted_iter(0..max).unwrap();

let mut buffer = Vec::new();
roaring.serialize_into(&mut buffer).unwrap();

let i = i as u32;
let ret = db.get(&rtxn, &i).unwrap();
assert_eq!(ret, Some(&buffer[..]));
});

for i in 1000..10_000 {
let mut rng = StdRng::seed_from_u64(i as u64);
let max = rng.random_range(10_000..=100_000);
let roaring = RoaringBitmap::from_sorted_iter(0..max)?;
buffer.clear();
roaring.serialize_into(&mut buffer)?;
db.put(&mut wtxn, &i, &buffer)?;
}

// opening multiple read-only transactions
// to check if those values are now available
// without committing beforehand
let rtxns =
(1000..10_000).map(|_| env.nested_read_txn(&wtxn)).collect::<heed::Result<Vec<_>>>()?;

rtxns.into_par_iter().enumerate().for_each(|(i, rtxn)| {
let mut rng = StdRng::seed_from_u64(i as u64);
let max = rng.random_range(10_000..=100_000);
let roaring = RoaringBitmap::from_sorted_iter(0..max).unwrap();

let mut buffer = Vec::new();
roaring.serialize_into(&mut buffer).unwrap();

let i = i as u32;
let ret = db.get(&rtxn, &i).unwrap();
assert_eq!(ret, Some(&buffer[..]));
});

Ok(())
}
87 changes: 80 additions & 7 deletions heed/src/envs/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use heed_traits::Comparator;
use synchronoise::SignalEvent;

use super::{
custom_key_cmp_wrapper, get_file_fd, metadata_from_fd, DefaultComparator, EnvClosingEvent,
EnvInfo, FlagSetMode, IntegerComparator, OPENED_ENV,
custom_key_cmp_wrapper, get_file_fd, DefaultComparator, EnvClosingEvent, EnvInfo, FlagSetMode,
IntegerComparator, OPENED_ENV,
};
use crate::cursor::{MoveOperation, RoCursor};
use crate::mdb::ffi::{self, MDB_env};
Expand All @@ -22,8 +22,8 @@ use crate::mdb::lmdb_flags::AllDatabaseFlags;
#[allow(unused)] // for cargo auto doc links
use crate::EnvOpenOptions;
use crate::{
CompactionOption, Database, DatabaseOpenOptions, EnvFlags, Error, Result, RoTxn, RwTxn,
Unspecified, WithTls,
assert_eq_env_txn, CompactionOption, Database, DatabaseOpenOptions, EnvFlags, Error, Result,
RoTxn, RwTxn, Unspecified, WithTls, WithoutTls,
};

/// An environment handle constructed by using [`EnvOpenOptions::open`].
Expand Down Expand Up @@ -63,11 +63,27 @@ impl<T> Env<T> {
/// # Ok(()) }
/// ```
pub fn real_disk_size(&self) -> Result<u64> {
Ok(self.try_clone_inner_file()?.metadata()?.len())
}

/// Try cloning the inner file used in the environment and return a `File`
/// corresponding to the environment file.
///
/// # Safety
///
/// This function is safe as we are creating a cloned fd of the inner file the file
/// is. Doing write operations on the file descriptor can lead to undefined behavior
/// and only read-only operations while no write operations are in progress is safe.
pub fn try_clone_inner_file(&self) -> Result<File> {
let mut fd = mem::MaybeUninit::uninit();
unsafe { mdb_result(ffi::mdb_env_get_fd(self.env_mut_ptr().as_mut(), fd.as_mut_ptr()))? };
let fd = unsafe { fd.assume_init() };
let metadata = unsafe { metadata_from_fd(fd)? };
Ok(metadata.len())
let raw_fd = unsafe { fd.assume_init() };
#[cfg(unix)]
let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(raw_fd) };
#[cfg(windows)]
let fd = unsafe { std::os::windows::io::BorrowedHandle::borrow_raw(raw_fd) };
let owned = fd.try_clone_to_owned()?;
Ok(File::from(owned))
}

/// Return the raw flags the environment was opened with.
Expand Down Expand Up @@ -343,6 +359,8 @@ impl<T> Env<T> {
/// A parent transaction and its cursors may not issue any other operations than _commit_ and
/// _abort_ while it has active child transactions.
pub fn nested_write_txn<'p>(&'p self, parent: &'p mut RwTxn) -> Result<RwTxn<'p>> {
assert_eq_env_txn!(self, parent);

RwTxn::nested(self, parent)
}

Expand Down Expand Up @@ -607,6 +625,61 @@ impl<T> Env<T> {
}
}

impl Env<WithoutTls> {
/// Create a nested read transaction that is capable of reading uncommitted changes.
///
/// The new transaction will be a nested transaction, with the transaction indicated by parent
/// as its parent. Transactions may be nested to any level.
///
/// This is a custom LMDB fork feature that allows reading uncommitted changes.
/// It enables parallel processing of data across multiple threads through
/// concurrent read-only transactions. You can [read more in this PR](https://github.com/meilisearch/heed/pull/307).
///
/// ```
/// use std::fs;
/// use std::path::Path;
/// use heed::{EnvOpenOptions, Database};
/// use heed::types::*;
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let dir = tempfile::tempdir()?;
/// let env = unsafe {
/// EnvOpenOptions::new()
/// .read_txn_without_tls()
/// .map_size(2 * 1024 * 1024) // 2 MiB
/// .open(dir.path())?
/// };
///
/// // we will open the default unnamed database
/// let mut wtxn = env.write_txn()?;
/// let db: Database<U32<byteorder::BigEndian>, U32<byteorder::BigEndian>> = env.create_database(&mut wtxn, None)?;
///
/// // opening a write transaction
/// for i in 0..1000 {
/// db.put(&mut wtxn, &i, &i)?;
/// }
///
/// // opening multiple read-only transactions
/// // to check if those values are now available
/// // without committing beforehand
/// let rtxns = (0..1000).map(|_| env.nested_read_txn(&wtxn)).collect::<heed::Result<Vec<_>>>()?;
///
/// for (i, rtxn) in rtxns.iter().enumerate() {
/// let i = i as u32;
/// let ret = db.get(&rtxn, &i)?;
/// assert_eq!(ret, Some(i));
/// }
///
/// # Ok(()) }
/// ```
#[cfg(not(master3))]
pub fn nested_read_txn<'p>(&'p self, parent: &'p RwTxn) -> Result<RoTxn<'p, WithoutTls>> {
assert_eq_env_txn!(self, parent);

RoTxn::<WithoutTls>::nested(self, parent)
}
}

impl<T> Clone for Env<T> {
fn clone(&self) -> Self {
Env { inner: self.inner.clone(), _tls_marker: PhantomData }
Expand Down
22 changes: 3 additions & 19 deletions heed/src/envs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::cmp::Ordering;
use std::collections::HashMap;
use std::ffi::c_void;
use std::fs::{File, Metadata};
use std::fs::File;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, BorrowedFd, RawFd};
use std::os::unix::io::{AsRawFd, RawFd};
use std::panic::catch_unwind;
use std::path::{Path, PathBuf};
use std::process::abort;
Expand All @@ -12,7 +12,7 @@ use std::time::Duration;
#[cfg(windows)]
use std::{
ffi::OsStr,
os::windows::io::{AsRawHandle as _, BorrowedHandle, RawHandle},
os::windows::io::{AsRawHandle as _, RawHandle},
};
use std::{fmt, io};

Expand Down Expand Up @@ -131,22 +131,6 @@ fn get_file_fd(file: &File) -> RawHandle {
file.as_raw_handle()
}

#[cfg(unix)]
/// Get metadata from a file descriptor.
unsafe fn metadata_from_fd(raw_fd: RawFd) -> io::Result<Metadata> {
let fd = BorrowedFd::borrow_raw(raw_fd);
let owned = fd.try_clone_to_owned()?;
File::from(owned).metadata()
}

#[cfg(windows)]
/// Get metadata from a file descriptor.
unsafe fn metadata_from_fd(raw_fd: RawHandle) -> io::Result<Metadata> {
let fd = BorrowedHandle::borrow_raw(raw_fd);
let owned = fd.try_clone_to_owned()?;
File::from(owned).metadata()
}

/// A helper function that transforms the LMDB types into Rust types (`MDB_val` into slices)
/// and vice versa, the Rust types into C types (`Ordering` into an integer).
///
Expand Down
27 changes: 27 additions & 0 deletions heed/src/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,33 @@ impl<'e, T> RoTxn<'e, T> {
self.inner.txn.unwrap()
}

#[cfg(not(master3))]
pub(crate) fn nested<'p>(
env: &'p Env<WithoutTls>,
parent: &'p RwTxn,
) -> Result<RoTxn<'p, WithoutTls>> {
let mut txn: *mut ffi::MDB_txn = ptr::null_mut();
let parent_ptr: *mut ffi::MDB_txn = unsafe { parent.txn.inner.txn.unwrap().as_mut() };

unsafe {
// Note that we open a write transaction here and this is the (current)
// ugly way to trick LMDB and let me create multiple write txn.
mdb_result(ffi::mdb_txn_begin(
env.env_mut_ptr().as_mut(),
parent_ptr,
ffi::MDB_RDONLY,
&mut txn,
))?
};

// Note that we wrap the write txn into a RoTxn so it's
// safe as the user cannot do any modification with it.
Ok(RoTxn {
inner: RoTxnInner { txn: NonNull::new(txn), env: Cow::Borrowed(&env.inner) },
_tls_marker: PhantomData,
})
}

pub(crate) fn env_mut_ptr(&self) -> NonNull<ffi::MDB_env> {
self.inner.env.env_mut_ptr()
}
Expand Down
2 changes: 1 addition & 1 deletion lmdb-master-sys/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "lmdb-master-sys"
# NB: When modifying, also modify html_root_url in lib.rs
version = "0.2.5"
version = "0.2.6-nested-rtxns-6"
authors = [
"Kerollmops <[email protected]>",
"Dan Burkert <[email protected]>",
Expand Down
6 changes: 5 additions & 1 deletion lmdb-master-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ fn main() {
// https://github.com/mozilla/lmdb/blob/b7df2cac50fb41e8bd16aab4cc5fd167be9e032a/libraries/liblmdb/Makefile#L23
.flag_if_supported("-Wno-unused-parameter")
.flag_if_supported("-Wbad-function-cast")
.flag_if_supported("-Wuninitialized");
.flag_if_supported("-Wuninitialized")
// Enable atomics on Windows
.flag_if_supported("/std:c11")
// Explicitly enable C11 atomics support
.flag_if_supported("/experimental:c11atomics");

// On Windows, link explicitly advapi32 for security functions
#[cfg(target_os = "windows")]
Expand Down
2 changes: 1 addition & 1 deletion lmdb-master-sys/lmdb
Loading