Skip to content

Commit 04067bb

Browse files
committed
fix: change how we generate start/end times for ScheduledDataTime events if merge_events param is True
1 parent 97db7f2 commit 04067bb

File tree

3 files changed

+115
-3
lines changed

3 files changed

+115
-3
lines changed

src/corva/handlers.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@
2323
from corva.api import Api
2424
from corva.configuration import SETTINGS
2525
from corva.logger import CORVA_LOGGER, CorvaLoggerHandler, LoggingContext
26+
from corva.models import validators
2627
from corva.models.base import RawBaseEvent
2728
from corva.models.context import CorvaContext
2829
from corva.models.merge.merge import PartialRerunMergeEvent
2930
from corva.models.merge.raw import RawPartialRerunMergeEvent
30-
from corva.models.scheduled.raw import RawScheduledEvent
31+
from corva.models.scheduled.raw import RawScheduledDataTimeEvent, RawScheduledEvent
3132
from corva.models.scheduled.scheduled import ScheduledEvent, ScheduledNaturalTimeEvent
3233
from corva.models.scheduled.scheduler_type import SchedulerType
3334
from corva.models.stream.raw import RawStreamEvent
@@ -300,6 +301,11 @@ def wrapper(
300301
hash_name=hash_name, redis_client=redis_client
301302
)
302303

304+
if isinstance(event, RawScheduledDataTimeEvent) and event.merge_metadata:
305+
event = event.rebuild_with_modified_times(
306+
event.merge_metadata.start_time, event.merge_metadata.end_time
307+
)
308+
303309
app_event = event.scheduler_type.event.parse_obj(event)
304310

305311
with LoggingContext(
@@ -589,7 +595,9 @@ def _merge_events(
589595
"""
590596
Merges incoming aws_events into one.
591597
Merge happens differently, depending on app type.
592-
Only "scheduled" and "stream" type of apps can be processed here.
598+
Only "scheduled"(data and depth) and "stream" type of apps can be processed here.
599+
Scheduled Natural events don't need a merge since they will never receive multiple
600+
events in batch.
593601
If somehow any other type is passed - raise an exception
594602
"""
595603
if not isinstance(
@@ -610,11 +618,24 @@ def _merge_events(
610618
if is_depth
611619
else ("schedule_start", "schedule_end")
612620
)
613-
min_event_start = min(e[event_start] for e in aws_event)
621+
min_event_start: int = min(e[event_start] for e in aws_event)
614622
max_event_end = max(
615623
(e[event_end] for e in aws_event if e.get(event_end) is not None),
616624
default=None,
617625
)
626+
if not is_depth:
627+
# we're working with ScheduledDataTimeEvent
628+
max_event_start: int = max(e[event_start] for e in aws_event)
629+
interval = aws_event[0]["interval"]
630+
# cast from ms to s if needed
631+
min_value = validators.from_ms_to_s(min_event_start)
632+
max_value = validators.from_ms_to_s(max_event_start)
633+
aws_event[0]["merge_metadata"] = {
634+
"start_time": int(min_value - interval + 1),
635+
"end_time": int(max_value),
636+
"number_of_merged_events": len(aws_event),
637+
}
638+
618639
aws_event[0][event_start] = min_event_start
619640
if max_event_end:
620641
aws_event[0][event_end] = max_event_end

src/corva/models/scheduled/raw.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ def set_schedule_as_completed(self, api: Api) -> None:
5656
api.post(path=f'scheduler/{self.schedule_id}/completed')
5757

5858

59+
class DataTimeMergeMetadata(CorvaBaseEvent):
60+
"""
61+
For data time events we may need to store information about event merging
62+
(if merge_events=True is used in @scheduled)
63+
"""
64+
65+
start_time: int
66+
end_time: int
67+
number_of_merged_events: int
68+
69+
5970
class RawScheduledDataTimeEvent(RawScheduledEvent):
6071
"""Raw data time scheduled event data.
6172
@@ -71,6 +82,7 @@ class RawScheduledDataTimeEvent(RawScheduledEvent):
7182
start_time: int = None # type: ignore
7283
interval: float
7384
rerun: Optional[RerunTime] = None
85+
merge_metadata: Optional[DataTimeMergeMetadata] = None
7486

7587
# validators
7688
_set_schedule_start = pydantic.validator('schedule_start', allow_reuse=True)(
@@ -84,6 +96,14 @@ def set_start_time(cls, values: dict) -> dict:
8496
values["start_time"] = int(values["schedule_start"] - values["interval"] + 1)
8597
return values
8698

99+
def rebuild_with_modified_times(
100+
self, start_time: int, end_time: int
101+
) -> RawScheduledDataTimeEvent:
102+
raw_dict = self.dict(exclude_none=True, by_alias=True)
103+
raw_dict["start_time"] = start_time
104+
raw_dict["end_time"] = end_time
105+
return RawScheduledDataTimeEvent.parse_obj(raw_dict)
106+
87107

88108
class RawScheduledDepthEvent(RawScheduledEvent):
89109
"""Raw depth scheduled event data.

tests/unit/test_scheduled_app.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import re
3+
from copy import deepcopy
34

45
import pytest
56
import redis
@@ -532,3 +533,73 @@ def scheduled_app(event, api, cache):
532533

533534
with pytest.raises(redis.exceptions.ConnectionError):
534535
scheduled_app(event, context)
536+
537+
538+
@pytest.mark.parametrize("merge_events", [True, False])
539+
def test_merge_events_parameter(merge_events, context, mocker):
540+
@scheduled(merge_events=merge_events)
541+
def scheduled_app(event, api, state):
542+
# For this test, just return the processed event list for inspection.
543+
return event
544+
545+
# Create two separate events with different schedule_start values.
546+
# Note: schedule_start is provided in milliseconds.
547+
event1 = RawScheduledDataTimeEvent(
548+
asset_id=1,
549+
interval=60, # interval value in seconds
550+
schedule=123,
551+
schedule_start=1744718400000, # 2025-04-15T12:00:00 in milliseconds
552+
schedule_end=1744718460000, # 2025-04-15T12:01:00 in milliseconds
553+
app_connection=1,
554+
app_stream=2,
555+
company=1,
556+
scheduler_type=SchedulerType.data_time,
557+
).dict(by_alias=True, exclude_unset=True)
558+
559+
event2 = RawScheduledDataTimeEvent(
560+
asset_id=1,
561+
interval=60,
562+
schedule=124,
563+
schedule_start=1744718460000, # 2025-04-15T12:01:00 in milliseconds
564+
schedule_end=1744718520000, # 2025-04-15T12:02:00 in milliseconds
565+
app_connection=1,
566+
app_stream=2,
567+
company=1,
568+
scheduler_type=SchedulerType.data_time,
569+
).dict(by_alias=True, exclude_unset=True)
570+
original_event1 = deepcopy(event1)
571+
572+
# Combine the events in the input structure
573+
events = [[event1, event2]]
574+
575+
# Call the scheduled app which should process the events.
576+
result = scheduled_app(events, context)
577+
578+
if merge_events:
579+
# When merging is enabled, the app should merge the input events
580+
# into a single event.
581+
assert (
582+
len(result) == 1
583+
), "Expected a single merged event when merge_events is true."
584+
merged_event = result[0]
585+
586+
# Calculate the expected start_time and end_time.
587+
expected_start_time = (
588+
original_event1["schedule_start"] - original_event1["interval"] + 1
589+
)
590+
expected_end_time = event2["schedule_start"]
591+
592+
assert (
593+
merged_event.start_time == expected_start_time
594+
), f"Expected time {expected_start_time}, got {merged_event.start_time}."
595+
# The merged event is expected to have an 'end_time' attribute set.
596+
actual_end_time = getattr(merged_event, "end_time", None)
597+
assert (
598+
actual_end_time == expected_end_time
599+
), f"Expected merged end_time {expected_end_time}, got {actual_end_time}."
600+
else:
601+
# When merging is disabled, the app should return the events as-is
602+
# (i.e. two separate events).
603+
assert (
604+
len(result) == 2
605+
), "Expected two separate events when merge_events is false."

0 commit comments

Comments
 (0)