diff --git a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py index ea082c73..49412c5a 100644 --- a/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py +++ b/osprey_worker/src/osprey/worker/lib/storage/stored_execution_result.py @@ -10,9 +10,10 @@ import gevent import google.cloud.storage as storage import pytz +from gevent.lock import Semaphore from google.api_core import retry from google.cloud.bigtable import row_filters, row_set -from google.cloud.bigtable.row import Row +from google.cloud.bigtable.row import AppendRow, ConditionalRow, DirectRow, Row from minio import Minio from minio.error import S3Error from osprey.engine.executor.execution_context import ExecutionResult @@ -177,10 +178,18 @@ def _censor_data( ) +BIGTABLE_INSERT_QUEUE_MAX_SIZE = 75 + + # TODO: Add tests class StoredExecutionResultBigTable(ExecutionResultStore): + queued_events: List[AppendRow | ConditionalRow | DirectRow] = [] + retry_policy = retry.Retry(initial=1.0, maximum=2.0, multiplier=1.25, deadline=120.0) + def __init__(self): + self._lock = Semaphore() + def select_one(self, action_id: int) -> Optional[Dict[str, Any]]: row = osprey_bigtable.table('stored_execution_result').read_row( StoredExecutionResultBigTable._encode_action_id(action_id), row_filters.CellsColumnLimitFilter(1) @@ -229,7 +238,20 @@ def insert( row.set_cell('execution_result', b'error_traces', error_traces_json.encode(), timestamp=timestamp) row.set_cell('execution_result', b'timestamp', timestamp.isoformat().encode(), timestamp=timestamp) row.set_cell('execution_result', b'action_data', action_data_json.encode(), timestamp=timestamp) - osprey_bigtable.table('stored_execution_result').mutate_rows([row], retry=self.retry_policy) + + with self._lock: + self.queued_events.append(row) + + if len(self.queued_events) < BIGTABLE_INSERT_QUEUE_MAX_SIZE: + return + + to_insert = self.queued_events.copy() + + self.queued_events = [] + + # this should technically be handling failures probably, though it wasn't as is so im not sure where + # or if that usually happens + osprey_bigtable.table('stored_execution_result').mutate_rows(to_insert, retry=self.retry_policy) @staticmethod def _encode_action_id(action_id_snowflake: int) -> bytes: