@@ -410,64 +410,11 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
410410 ) ?;
411411 }
412412 WriteBatch :: Serial ( batch) => {
413- let mut task_items_result = Ok ( Vec :: new ( ) ) ;
414- turbo_tasks:: scope ( |s| {
415- s. spawn ( |_| {
416- task_items_result =
417- process_task_data ( snapshots, None :: < & T :: ConcurrentWriteBatch < ' _ > > ) ;
418- } ) ;
419-
420- let mut next_task_id =
421- get_next_free_task_id :: <
422- T :: SerialWriteBatch < ' _ > ,
423- T :: ConcurrentWriteBatch < ' _ > ,
424- > ( & mut WriteBatchRef :: serial ( batch) ) ?;
425-
426- {
427- let _span = tracing:: trace_span!(
428- "update task cache" ,
429- items = task_cache_updates. iter( ) . map( |m| m. len( ) ) . sum:: <usize >( )
430- )
431- . entered ( ) ;
432- let mut task_type_bytes = Vec :: new ( ) ;
433- for ( task_type, task_id) in task_cache_updates. into_iter ( ) . flatten ( ) {
434- let task_id = * task_id;
435- serialize_task_type ( & task_type, & mut task_type_bytes, task_id) ?;
436-
437- batch
438- . put (
439- KeySpace :: ForwardTaskCache ,
440- WriteBuffer :: Borrowed ( & task_type_bytes) ,
441- WriteBuffer :: Borrowed ( & task_id. to_le_bytes ( ) ) ,
442- )
443- . with_context ( || {
444- anyhow ! ( "Unable to write task cache {task_type:?} => {task_id}" )
445- } ) ?;
446- batch
447- . put (
448- KeySpace :: ReverseTaskCache ,
449- WriteBuffer :: Borrowed ( IntKey :: new ( task_id) . as_ref ( ) ) ,
450- WriteBuffer :: Borrowed ( & task_type_bytes) ,
451- )
452- . with_context ( || {
453- anyhow ! ( "Unable to write task cache {task_id} => {task_type:?}" )
454- } ) ?;
455- next_task_id = next_task_id. max ( task_id + 1 ) ;
456- }
457- }
458-
459- save_infra :: < T :: SerialWriteBatch < ' _ > , T :: ConcurrentWriteBatch < ' _ > > (
460- & mut WriteBatchRef :: serial ( batch) ,
461- next_task_id,
462- session_id,
463- operations,
464- ) ?;
465- anyhow:: Ok ( ( ) )
466- } ) ?;
467-
468413 {
469414 let _span = tracing:: trace_span!( "update tasks" ) . entered ( ) ;
470- for ( task_id, meta, data) in task_items_result?. into_iter ( ) . flatten ( ) {
415+ let task_items =
416+ process_task_data ( snapshots, None :: < & T :: ConcurrentWriteBatch < ' _ > > ) ?;
417+ for ( task_id, meta, data) in task_items. into_iter ( ) . flatten ( ) {
471418 let key = IntKey :: new ( * task_id) ;
472419 let key = key. as_ref ( ) ;
473420 if let Some ( meta) = meta {
@@ -485,7 +432,54 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
485432 } ) ?;
486433 }
487434 }
435+ batch. flush ( KeySpace :: TaskMeta ) ?;
436+ batch. flush ( KeySpace :: TaskData ) ?;
437+ }
438+
439+ let mut next_task_id = get_next_free_task_id :: <
440+ T :: SerialWriteBatch < ' _ > ,
441+ T :: ConcurrentWriteBatch < ' _ > ,
442+ > ( & mut WriteBatchRef :: serial ( batch) ) ?;
443+
444+ {
445+ let _span = tracing:: trace_span!(
446+ "update task cache" ,
447+ items = task_cache_updates. iter( ) . map( |m| m. len( ) ) . sum:: <usize >( )
448+ )
449+ . entered ( ) ;
450+ let mut task_type_bytes = Vec :: new ( ) ;
451+ for ( task_type, task_id) in task_cache_updates. into_iter ( ) . flatten ( ) {
452+ let task_id = * task_id;
453+ serialize_task_type ( & task_type, & mut task_type_bytes, task_id) ?;
454+
455+ batch
456+ . put (
457+ KeySpace :: ForwardTaskCache ,
458+ WriteBuffer :: Borrowed ( & task_type_bytes) ,
459+ WriteBuffer :: Borrowed ( & task_id. to_le_bytes ( ) ) ,
460+ )
461+ . with_context ( || {
462+ anyhow ! ( "Unable to write task cache {task_type:?} => {task_id}" )
463+ } ) ?;
464+ batch
465+ . put (
466+ KeySpace :: ReverseTaskCache ,
467+ WriteBuffer :: Borrowed ( IntKey :: new ( task_id) . as_ref ( ) ) ,
468+ WriteBuffer :: Borrowed ( & task_type_bytes) ,
469+ )
470+ . with_context ( || {
471+ anyhow ! ( "Unable to write task cache {task_id} => {task_type:?}" )
472+ } ) ?;
473+ next_task_id = next_task_id. max ( task_id + 1 ) ;
474+ }
488475 }
476+
477+ save_infra :: < T :: SerialWriteBatch < ' _ > , T :: ConcurrentWriteBatch < ' _ > > (
478+ & mut WriteBatchRef :: serial ( batch) ,
479+ next_task_id,
480+ session_id,
481+ operations,
482+ ) ?;
489483 }
490484 }
491485
0 commit comments