Skip to content

Commit 08ba71c

Browse files
authored
fix: change how we generate start/end times for ScheduledDataTime events if merge_events param is True
2 parents 97db7f2 + 2f0c78d commit 08ba71c

File tree

8 files changed

+128
-10
lines changed

8 files changed

+128
-10
lines changed

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
## [1.14.0] - 2025-04-17
10+
### Fixed
11+
- merge_events parameter for scheduled data time apps should result in correct start/end times in a final app event.
12+
913
## [1.13.1] - 2025-02-17
1014
### Fixed
1115
- Documentation 404 at GitHub pages
@@ -401,7 +405,9 @@ env variables, that should be used to configure logging.
401405
- Event classes: `StreamEvent`, `ScheduledEvent` and `TaskEvent`.
402406

403407

404-
[Unreleased] https://github.com/corva-ai/python-sdk/compare/v1.13.0...master
408+
[Unreleased] https://github.com/corva-ai/python-sdk/compare/v1.14.0...master
409+
[1.14.0] https://github.com/corva-ai/python-sdk/compare/v1.13.1...v1.14.0
410+
[1.13.1] https://github.com/corva-ai/python-sdk/compare/v1.13.0...v1.13.1
405411
[1.13.0] https://github.com/corva-ai/python-sdk/compare/v1.12.1...v1.13.0
406412
[1.12.1] https://github.com/corva-ai/python-sdk/compare/v1.12.0...v1.12.1
407413
[1.12.0] https://github.com/corva-ai/python-sdk/compare/v1.11.4...v1.12.0

docs/antora-playbook.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ content:
77
start_path: docs
88
branches: []
99
# branches: HEAD # Use this for local development
10-
tags: [v1.13.1]
10+
tags: [v1.14.0]
1111
asciidoc:
1212
attributes:
1313
page-toclevels: 5

docs/antora.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
name: corva-sdk
2-
version: ~
2+
version: 1.14.0
33
nav: [modules/ROOT/nav.adoc]

src/corva/handlers.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@
2323
from corva.api import Api
2424
from corva.configuration import SETTINGS
2525
from corva.logger import CORVA_LOGGER, CorvaLoggerHandler, LoggingContext
26+
from corva.models import validators
2627
from corva.models.base import RawBaseEvent
2728
from corva.models.context import CorvaContext
2829
from corva.models.merge.merge import PartialRerunMergeEvent
2930
from corva.models.merge.raw import RawPartialRerunMergeEvent
30-
from corva.models.scheduled.raw import RawScheduledEvent
31+
from corva.models.scheduled.raw import RawScheduledDataTimeEvent, RawScheduledEvent
3132
from corva.models.scheduled.scheduled import ScheduledEvent, ScheduledNaturalTimeEvent
3233
from corva.models.scheduled.scheduler_type import SchedulerType
3334
from corva.models.stream.raw import RawStreamEvent
@@ -300,6 +301,11 @@ def wrapper(
300301
hash_name=hash_name, redis_client=redis_client
301302
)
302303

304+
if isinstance(event, RawScheduledDataTimeEvent) and event.merge_metadata:
305+
event = event.rebuild_with_modified_times(
306+
event.merge_metadata.start_time, event.merge_metadata.end_time
307+
)
308+
303309
app_event = event.scheduler_type.event.parse_obj(event)
304310

305311
with LoggingContext(
@@ -589,7 +595,9 @@ def _merge_events(
589595
"""
590596
Merges incoming aws_events into one.
591597
Merge happens differently, depending on app type.
592-
Only "scheduled" and "stream" type of apps can be processed here.
598+
Only "scheduled"(data and depth) and "stream" type of apps can be processed here.
599+
Scheduled Natural events don't need a merge since they will never receive multiple
600+
events in batch.
593601
If somehow any other type is passed - raise an exception
594602
"""
595603
if not isinstance(
@@ -610,11 +618,24 @@ def _merge_events(
610618
if is_depth
611619
else ("schedule_start", "schedule_end")
612620
)
613-
min_event_start = min(e[event_start] for e in aws_event)
621+
min_event_start: int = min(e[event_start] for e in aws_event)
614622
max_event_end = max(
615623
(e[event_end] for e in aws_event if e.get(event_end) is not None),
616624
default=None,
617625
)
626+
if not is_depth:
627+
# we're working with ScheduledDataTimeEvent
628+
max_event_start: int = max(e[event_start] for e in aws_event)
629+
interval = aws_event[0]["interval"]
630+
# cast from ms to s if needed
631+
min_value = validators.from_ms_to_s(min_event_start)
632+
max_value = validators.from_ms_to_s(max_event_start)
633+
aws_event[0]["merge_metadata"] = {
634+
"start_time": int(min_value - interval + 1),
635+
"end_time": int(max_value),
636+
"number_of_merged_events": len(aws_event),
637+
}
638+
618639
aws_event[0][event_start] = min_event_start
619640
if max_event_end:
620641
aws_event[0][event_end] = max_event_end

src/corva/models/scheduled/raw.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ def set_schedule_as_completed(self, api: Api) -> None:
5656
api.post(path=f'scheduler/{self.schedule_id}/completed')
5757

5858

59+
class DataTimeMergeMetadata(CorvaBaseEvent):
60+
"""
61+
For data time events we may need to store information about event merging
62+
(if merge_events=True is used in @scheduled)
63+
"""
64+
65+
start_time: int
66+
end_time: int
67+
number_of_merged_events: int
68+
69+
5970
class RawScheduledDataTimeEvent(RawScheduledEvent):
6071
"""Raw data time scheduled event data.
6172
@@ -71,6 +82,7 @@ class RawScheduledDataTimeEvent(RawScheduledEvent):
7182
start_time: int = None # type: ignore
7283
interval: float
7384
rerun: Optional[RerunTime] = None
85+
merge_metadata: Optional[DataTimeMergeMetadata] = None
7486

7587
# validators
7688
_set_schedule_start = pydantic.validator('schedule_start', allow_reuse=True)(
@@ -84,6 +96,14 @@ def set_start_time(cls, values: dict) -> dict:
8496
values["start_time"] = int(values["schedule_start"] - values["interval"] + 1)
8597
return values
8698

99+
def rebuild_with_modified_times(
100+
self, start_time: int, end_time: int
101+
) -> RawScheduledDataTimeEvent:
102+
raw_dict = self.dict(exclude_none=True, by_alias=True)
103+
raw_dict["start_time"] = start_time
104+
raw_dict["end_time"] = end_time
105+
return RawScheduledDataTimeEvent.parse_obj(raw_dict)
106+
87107

88108
class RawScheduledDepthEvent(RawScheduledEvent):
89109
"""Raw depth scheduled event data.

src/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION = "1.13.1"
1+
VERSION = "1.14.0"

tests/unit/test_docs/test_merging.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ def test_tutorial001(context):
7070
@pytest.mark.parametrize(
7171
"time_ranges, flat",
7272
(
73-
[((60, 120), (61, None), (62, 122)), True],
74-
[((61, None), (60, None), (62, None)), False],
73+
[((60, 120), (120, 180), (180, 240)), True],
74+
[((120, 180), (60, 120), (180, 240)), False],
7575
),
7676
)
7777
def test_tutorial002(context, time_ranges, flat):
@@ -97,6 +97,6 @@ def test_tutorial002(context, time_ranges, flat):
9797
result_event: ScheduledDataTimeEvent = tutorial002.app(event, context)[0]
9898

9999
assert result_event.start_time == 1
100-
assert result_event.end_time == 60
100+
assert result_event.end_time == 180
101101
max_schedule_value = time_ranges[-1][-1]
102102
assert result_event.schedule_end == max_schedule_value # type: ignore[attr-defined]

tests/unit/test_scheduled_app.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import re
3+
from copy import deepcopy
34

45
import pytest
56
import redis
@@ -532,3 +533,73 @@ def scheduled_app(event, api, cache):
532533

533534
with pytest.raises(redis.exceptions.ConnectionError):
534535
scheduled_app(event, context)
536+
537+
538+
@pytest.mark.parametrize("merge_events", [True, False])
539+
def test_merge_events_parameter(merge_events, context, mocker):
540+
@scheduled(merge_events=merge_events)
541+
def scheduled_app(event, api, state):
542+
# For this test, just return the processed event list for inspection.
543+
return event
544+
545+
# Create two separate events with different schedule_start values.
546+
# Note: schedule_start is provided in milliseconds.
547+
event1 = RawScheduledDataTimeEvent(
548+
asset_id=1,
549+
interval=60, # interval value in seconds
550+
schedule=123,
551+
schedule_start=1744718400000, # 2025-04-15T12:00:00 in milliseconds
552+
schedule_end=1744718460000, # 2025-04-15T12:01:00 in milliseconds
553+
app_connection=1,
554+
app_stream=2,
555+
company=1,
556+
scheduler_type=SchedulerType.data_time,
557+
).dict(by_alias=True, exclude_unset=True)
558+
559+
event2 = RawScheduledDataTimeEvent(
560+
asset_id=1,
561+
interval=60,
562+
schedule=124,
563+
schedule_start=1744718460000, # 2025-04-15T12:01:00 in milliseconds
564+
schedule_end=1744718520000, # 2025-04-15T12:02:00 in milliseconds
565+
app_connection=1,
566+
app_stream=2,
567+
company=1,
568+
scheduler_type=SchedulerType.data_time,
569+
).dict(by_alias=True, exclude_unset=True)
570+
original_event1 = deepcopy(event1)
571+
572+
# Combine the events in the input structure
573+
events = [[event1, event2]]
574+
575+
# Call the scheduled app which should process the events.
576+
result = scheduled_app(events, context)
577+
578+
if merge_events:
579+
# When merging is enabled, the app should merge the input events
580+
# into a single event.
581+
assert (
582+
len(result) == 1
583+
), "Expected a single merged event when merge_events is true."
584+
merged_event = result[0]
585+
586+
# Calculate the expected start_time and end_time.
587+
expected_start_time = (
588+
original_event1["schedule_start"] - original_event1["interval"] + 1
589+
)
590+
expected_end_time = event2["schedule_start"]
591+
592+
assert (
593+
merged_event.start_time == expected_start_time
594+
), f"Expected time {expected_start_time}, got {merged_event.start_time}."
595+
# The merged event is expected to have an 'end_time' attribute set.
596+
actual_end_time = getattr(merged_event, "end_time", None)
597+
assert (
598+
actual_end_time == expected_end_time
599+
), f"Expected merged end_time {expected_end_time}, got {actual_end_time}."
600+
else:
601+
# When merging is disabled, the app should return the events as-is
602+
# (i.e. two separate events).
603+
assert (
604+
len(result) == 2
605+
), "Expected two separate events when merge_events is false."

0 commit comments

Comments
 (0)