Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import gevent
import google.cloud.storage as storage
import pytz
from gevent.lock import Semaphore
from google.api_core import retry
from google.cloud.bigtable import row_filters, row_set
from google.cloud.bigtable.row import Row
from google.cloud.bigtable.row import AppendRow, ConditionalRow, DirectRow, Row
from minio import Minio
from minio.error import S3Error
from osprey.engine.executor.execution_context import ExecutionResult
Expand Down Expand Up @@ -177,10 +178,18 @@ def _censor_data(
)


BIGTABLE_INSERT_QUEUE_MAX_SIZE = 75


# TODO: Add tests
class StoredExecutionResultBigTable(ExecutionResultStore):
queued_events: List[AppendRow | ConditionalRow | DirectRow] = []

retry_policy = retry.Retry(initial=1.0, maximum=2.0, multiplier=1.25, deadline=120.0)

def __init__(self):
self._lock = Semaphore()

def select_one(self, action_id: int) -> Optional[Dict[str, Any]]:
row = osprey_bigtable.table('stored_execution_result').read_row(
StoredExecutionResultBigTable._encode_action_id(action_id), row_filters.CellsColumnLimitFilter(1)
Expand Down Expand Up @@ -229,7 +238,20 @@ def insert(
row.set_cell('execution_result', b'error_traces', error_traces_json.encode(), timestamp=timestamp)
row.set_cell('execution_result', b'timestamp', timestamp.isoformat().encode(), timestamp=timestamp)
row.set_cell('execution_result', b'action_data', action_data_json.encode(), timestamp=timestamp)
osprey_bigtable.table('stored_execution_result').mutate_rows([row], retry=self.retry_policy)

with self._lock:
self.queued_events.append(row)

if len(self.queued_events) < BIGTABLE_INSERT_QUEUE_MAX_SIZE:
return

to_insert = self.queued_events.copy()

self.queued_events = []

# this should technically be handling failures probably, though it wasn't as is so im not sure where
# or if that usually happens
osprey_bigtable.table('stored_execution_result').mutate_rows(to_insert, retry=self.retry_policy)

@staticmethod
def _encode_action_id(action_id_snowflake: int) -> bytes:
Expand Down
Loading