Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide store_batch function to storage backend in order to save batch of values atomically #1078

Closed
wants to merge 1 commit into from

Conversation

andrussal
Copy link
Contributor

@andrussal andrussal commented Feb 25, 2025

1. What does this PR implement?

Provide store_batch function to storage backend in order to save batch of values atomically.

Not sure if this is the right way to go. Just opened for comments.

The reason of creating this change is related to this PR where we should store multiple keys atomically: #1075

Justification why this function is needed in addition to existing storage interface.

Current storage interface provides general purpose execute function that allows to run arbitrary code within a context of a transaction. In order to call this functions, caller needs to provide backend specific transaction instance. It seems that currently it’s not straightforward to create such Transaction from an OverWatch service.

Another justification to add this function to storage interface is that this should be a fairly common use case how transactions are used in the system - storing batch of values together.

Alternately could try to refactor current execute solution so that it doesn’t require passing backend specific Transaction instance. I started looking into it but this seems to be much larger effort than providing batch function directly.

2. Does the code have enough context to be clearly understood?

Yes

3. Who are the specification authors and who is accountable for this PR?

@andrussal

4. Is the specification accurate and complete?

N/A

5. Does the implementation introduce changes in the specification?

N/A

Checklist

  • 1. Description added.
  • 2. Context and links to Specification document(s) added.
  • 3. Main contact(s) (developers and specification authors) added
  • 4. Implementation and Specification are 100% in sync including changes. This is critical.
  • 5. Link PR to a specific milestone.

Comment on lines +47 to +48
/// Store batch of key/value pairs in one transaction
async fn store_batch(&mut self, keys_values: Vec<(Bytes, Bytes)>) -> Result<(), Self::Error>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is that if somehow the backend does not support it we are forcing its implementation. Instead, this should be done through execute if possible.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment with a proposal. I see the problem we are facing better now. We would end up with the same problem I stated above but in a different place. But I think the other way is a bit better to reuse the current API without changes.

Copy link
Collaborator

@danielSanchezQ danielSanchezQ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case we could add the call to be implemented by the storage transaction type:

/// Trait to abstract storage transactions return and operation types
pub trait StorageTransaction: Send + Sync {
    type Result: Send + Sync;
    type Transaction: Send + Sync;

    fn batch_store(to_store: Vec<(Bytes, Bytes)>) -> Transaction;
}

That should build the computation to create a batch store transaction.
wdyt?

@andrussal
Copy link
Contributor Author

andrussal commented Feb 25, 2025

I feel like that I have some more "fundamental" things unclear about the execute functionality and how to use it properly.

I have been thinking how our transaction logic matches with the idea of database transaction. In the sense that it does something atomically from database perspective. I think that our current transaction "framework" actually doesn't really do it by default. For example, in RocksDb test, it makes multiple DB calls in a single closure but this doesn't use database implementation transaction functionality.

let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
        let txn = db.txn(|db| {
            let key = "foo";
            let value = "bar";
            db.put(key, value)?;
            let result = db.get(key)?;
            db.delete(key)?;
            Ok(result.map(|ivec| ivec.to_vec().into()))
        });
        let result = db.execute(txn).await??;

Here db.txn calls put and delete but they are actually not part of a database transaction, just separate calls.

Here we could potentially ask actual db transaction from db, something like let tx = db.init_transaction(), and then do tx.put, tx.delete and tx.commit. Alternatively, instead of db argument, closure could already provide actual db implementation transaction type, so it becomes db.txn(|tx|.


Another thing that is a bit unclear to me is how to create transaction object from an OverWatch service. Execute message takes transaction as argument.

Execute {
        transaction: Backend::Transaction,
        reply_channel:
            tokio::sync::oneshot::Sender<<Backend::Transaction as StorageTransaction>::Result>,
    }
    

For example RocksDb transaction can be constructed like this:

  impl<SerdeOp> RocksBackend<SerdeOp> {
    pub fn txn(
        &self,
        executor: impl FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync + 'static,
    ) -> Transaction {
        Transaction {
            rocks: self.rocks.clone(),
            executor: Box::new(executor),
        }
    }
}

This requires caller to have access to RocksBackend and OverWatch service doesn't have it.

I was thinking maybe it makes sense to change transaction type in Execute message to something that doesn't reference db implementation. Maybe only contains impl FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync + 'static.

Execute {
        transaction: OverWatch::Transaction,
        ...

Also it seems that we may need to switch to async closure.

OverWatch service needs to prepare this Box<dyn FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync> for Execute message.

Here &DB argument references specific db implementation, like RocksDb DB. But OverWatch service probably should have access only to the generic StorageBackend. This means that Box<dyn FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync> doesn't work. Instead it needs to be some kind of future because StorageBackend functions are async, whereas DB is sync.


So I guess my current feeling is that it's not straightforward to use existing execute implementation from OverWatch.

@danielSanchezQ
Copy link
Collaborator

I feel like that I have some more "fundamental" things unclear about the execute functionality and how to use it properly.

I have been thinking how our transaction logic matches with the idea of database transaction. In the sense that it does something atomically from database perspective. I think that our current transaction "framework" actually doesn't really do it by default. For example, in RocksDb test, it makes multiple DB calls in a single closure but this doesn't use database implementation transaction functionality.

let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
        let txn = db.txn(|db| {
            let key = "foo";
            let value = "bar";
            db.put(key, value)?;
            let result = db.get(key)?;
            db.delete(key)?;
            Ok(result.map(|ivec| ivec.to_vec().into()))
        });
        let result = db.execute(txn).await??;

Here db.txn calls put and delete but they are actually not part of a database transaction, just separate calls.

Here we could potentially ask actual db transaction from db, something like let tx = db.init_transaction(), and then do tx.put, tx.delete and tx.commit. Alternatively, instead of db argument, closure could already provide actual db implementation transaction type, so it becomes db.txn(|tx|.

Another thing that is a bit unclear to me is how to create transaction object from an OverWatch service. Execute message takes transaction as argument.

Execute {
        transaction: Backend::Transaction,
        reply_channel:
            tokio::sync::oneshot::Sender<<Backend::Transaction as StorageTransaction>::Result>,
    }
    

For example RocksDb transaction can be constructed like this:

  impl<SerdeOp> RocksBackend<SerdeOp> {
    pub fn txn(
        &self,
        executor: impl FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync + 'static,
    ) -> Transaction {
        Transaction {
            rocks: self.rocks.clone(),
            executor: Box::new(executor),
        }
    }
}

This requires caller to have access to RocksBackend and OverWatch service doesn't have it.

I was thinking maybe it makes sense to change transaction type in Execute message to something that doesn't reference db implementation. Maybe only contains impl FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync + 'static.

Execute {
        transaction: OverWatch::Transaction,
        ...

Also it seems that we may need to switch to async closure.

OverWatch service needs to prepare this Box<dyn FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync> for Execute message.

Here &DB argument references specific db implementation, like RocksDb DB. But OverWatch service probably should have access only to the generic StorageBackend. This means that Box<dyn FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync> doesn't work. Instead it needs to be some kind of future because StorageBackend functions are async, whereas DB is sync.

So I guess my current feeling is that it's not straightforward to use existing execute implementation from OverWatch.

Yes. The transaction naming is missleading. It should actually just be called execute or something similar. In reality only the kv operations are abstracted. For anything else if implementation specific and that means you need to hardcode the type before hand.
The idea with execute is that you get access to a hook to the specific database and then you do whatever you need.
Changing the call to be async is actually a good idea. We could extend the api to have an execute_async in addition to what we have maybe?

@andrussal
Copy link
Contributor Author

Yes. The transaction naming is missleading. It should actually just be called execute or something similar. In reality only the kv operations are abstracted. For anything else if implementation specific and that means you need to hardcode the type before hand. The idea with execute is that you get access to a hook to the specific database and then you do whatever you need. Changing the call to be async is actually a good idea. We could extend the api to have an execute_async in addition to what we have maybe?

I am thinking that ideally we have an abstract type for the execute callback argument as well? Otherwise we end up with implementation specific code in services and elsewhere. Then again execute loses it's power to support implementation specific functionality. Just wondering how much we are willing to break abstraction to use db specific stuff anyway.

Maybe a good compromise for most use cases is to have something like in_transaction function that takes abstract Transaction as argument that has the same interface as StorageBackend(except execute)? Transaction would be a more generic abstraction than storing a batch as initially suggested.

(Actually I suggested to have async execute because StorageBackend is async. Maybe not needed if we go with db specific type in execute callback though...)

@danielSanchezQ
Copy link
Collaborator

I dont think we can abstract an execute generically because it hard depends on the actual underlying implementation. The only thing we can do here if abstract in the StorageTransaction trait (lets rename this) an executable that does what we need (builds it up). Otherwise it is up to the user of the service. That way we keep both options. But if you have another suggestion on how to do it generically I'm curious about it.

@andrussal
Copy link
Contributor Author

I dont think we can abstract an execute generically because it hard depends on the actual underlying implementation. The only thing we can do here if abstract in the StorageTransaction trait (lets rename this) an executable that does what we need (builds it up). Otherwise it is up to the user of the service. That way we keep both options. But if you have another suggestion on how to do it generically I'm curious about it.

I agree it's good to keep KV interface the same. Perhaps another option is to implement an additional trait for db backend implementation and bring it into scope when needed.

trait StorageExt {
   fn write_in_batch(...)
}

impl StorageExt for rocksdb:DB {
     fn write_in_batch(...)
}
use StorageExt;

fn store<DB: StorageExt>() {
      let exec = |db: DB| {
           db.write_in_batch(...);
      }
}

@danielSanchezQ
Copy link
Collaborator

I dont think we can abstract an execute generically because it hard depends on the actual underlying implementation. The only thing we can do here if abstract in the StorageTransaction trait (lets rename this) an executable that does what we need (builds it up). Otherwise it is up to the user of the service. That way we keep both options. But if you have another suggestion on how to do it generically I'm curious about it.

I agree it's good to keep KV interface the same. Perhaps another option is to implement an additional trait for db backend implementation and bring it into scope when needed.

trait StorageExt {
   fn write_in_batch(...)
}

impl StorageExt for rocksdb:DB {
     fn write_in_batch(...)
}
use StorageExt;

fn store<DB: StorageExt>() {
      let exec = |db: DB| {
           db.write_in_batch(...);
      }
}

Yeah, this similar extending the current StorageTransaction trait, but as a separate trait (maybe as an extension trait?). Anyway, you will still need the specific type to use it. So I think extending the trait for building the "execution operations" is a better approach.

@danielSanchezQ
Copy link
Collaborator

@andrussal as per our last conversation. I think we can simply close this one right?

@andrussal
Copy link
Contributor Author

@andrussal as per our last conversation. I think we can simply close this one right?

@andrussal andrussal closed this Mar 12, 2025
@andrussal andrussal deleted the feat/storage-batch-write branch March 12, 2025 10:29
@andrussal
Copy link
Contributor Author

@andrussal as per our last conversation. I think we can simply close this one right?

Yep, it's not worth doing it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants