Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull in hopping window table fix #412

Merged
merged 25 commits into from
Mar 30, 2024

Conversation

wbarnha
Copy link
Member

@wbarnha wbarnha commented Nov 18, 2022

Address additional changes to close #195 by pulling in forgotten changes from #349.

@wbarnha wbarnha changed the title Doncat tables fix Pull in hopping window table fix Nov 18, 2022
@thomas-chauvet
Copy link
Contributor

thomas-chauvet commented Nov 21, 2022

Hi,

This PR is really good because it will fix a long time issue on Faust and an improper (wrong) way to handle hopping window in Faust.

I used the code for _del_old_keys as a patch in my own code. I noticed it doesn't work properly when window has multiple keys. For instance, when we have window in the form (key_a, (t1, t2)) and (key_b, (t2, t3)). The fix will mix the different key into the same window closing trigger.

I tried to fix the issue in this snippet (which is not perfect) if you want to use it:

async def _del_old_keys(self) -> None:
    window = cast(WindowT, self.window)
    assert window
    for partition, timestamps in self._partition_timestamps.items():
        while timestamps and window.stale(timestamps[0], time.time()):
            timestamp = heappop(timestamps)
            triggered_windows = [
                self._partition_timestamp_keys.get((partition, window_range[1]))
                for window_range in self._window_ranges(timestamp)
            ]
            keys_to_remove = self._partition_timestamp_keys.pop(
                (partition, timestamp), None
            )
            if keys_to_remove:
                # TODO: need refactoring with dict comprehension?
                window_data = {}
                for windows in triggered_windows:
                    if windows:
                        for processed_window in windows:
                            # we use set to avoid duplicate element in window's data
                            # window[0] is the window's key
                            # it is not related to window's timestamp
                            # windows are in format:
                            # (key, (window_start, window_end))
                            window_data.setdefault(processed_window[0], []).extend(
                                self.data.get(processed_window, [])
                            )

                for key_to_remove in keys_to_remove:
                    self.data.pop(key_to_remove, None)
                    if key_to_remove[1][0] > self.last_closed_window:
                        await self.on_window_close(
                            key_to_remove, window_data[key_to_remove[0]]
                        )
                self.last_closed_window = max(
                    self.last_closed_window,
                    max(key[1][0] for key in keys_to_remove),
                )

The idea is to get different window_data by key. It should work if there is only one key.

The main issue I still have is that the window is shifted of the offset. It means for the window (t1, t2) I will end up with data from (t1 + offset, t2 + offset). I didn't find the trick to fix this.

You will find a below a full working and self-contained example. The window closing function does not do anything but print the element in the window.

import sys
import time
import types
from datetime import datetime, timedelta
from heapq import heappop
from typing import cast

import faust
from faust.types.windows import WindowT


async def _custom_del_old_keys(self) -> None:
    window = cast(WindowT, self.window)
    assert window
    for partition, timestamps in self._partition_timestamps.items():
        while timestamps and window.stale(timestamps[0], time.time()):
            timestamp = heappop(timestamps)
            triggered_windows = [
                self._partition_timestamp_keys.get((partition, window_range[1]))
                for window_range in self._window_ranges(timestamp)
            ]
            keys_to_remove = self._partition_timestamp_keys.pop(
                (partition, timestamp), None
            )
            if keys_to_remove:
                # TODO: need refactoring with dict comprehension?
                window_data = {}
                for windows in triggered_windows:
                    if windows:
                        for processed_window in windows:
                            # we use set to avoid duplicate element in window's data
                            # window[0] is the window's key
                            # it is not related to window's timestamp
                            # windows are in format:
                            # (key, (window_start, window_end))
                            window_data.setdefault(processed_window[0], []).extend(
                                self.data.get(processed_window, [])
                            )

                for key_to_remove in keys_to_remove:
                    self.data.pop(key_to_remove, None)
                    if key_to_remove[1][0] > self.last_closed_window:
                        await self.on_window_close(
                            key_to_remove, window_data[key_to_remove[0]]
                        )
                self.last_closed_window = max(
                    self.last_closed_window,
                    max(key[1][0] for key in keys_to_remove),
                )


class RawModel(faust.Record):
    key: str
    date: datetime
    value: float


TOPIC = "raw-event"
TABLE = "hopping_table"
KAFKA = "kafka://localhost:29092"
CLEANUP_INTERVAL = 1
WINDOW = 10
STEP = 5
WINDOW_EXPIRES = WINDOW + 2
PARTITIONS = 1

app = faust.App(
    "windowed-hopping-2",
    broker=KAFKA,
    topic_partitions=PARTITIONS,
    topic_disable_leader=True,
)

app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel)


def window_processor(key, events):
    """
    The real window is in fact the window_start + step to window_end + step
    """
    window_start = key[1][0]
    window_end = key[1][1]
    count = len(events)
    print(f"Window {window_start} - {window_end} has {count} events")
    # sort dict for clear view
    res = {event.date: (event.key, event.value) for event in events}
    res = dict(sorted(res.items()))
    for key, value in res.items():
        print(f"{key}: {value}")


table = app.Table(
    TABLE,
    default=list,
    partitions=PARTITIONS,
    on_window_close=window_processor,
)

table._del_old_keys = types.MethodType(_custom_del_old_keys, table)

hopping_table = table.hopping(
    WINDOW, STEP, expires=timedelta(seconds=WINDOW_EXPIRES)
).relative_to_field(RawModel.date)


@app.agent(source)
async def print_windowed_events(stream):
    event: RawModel
    async for event in stream:
        value_list = hopping_table[event.key].value()
        value_list.append(event)
        hopping_table[event.key] = value_list


@app.timer(1)
async def produce_a():
    await source.send(value=RawModel(key="a", value=1, date=time.time()), key="a")


@app.timer(1)
async def produce_b():
    await source.send(value=RawModel(key="b", value=999, date=time.time()), key="b")


if __name__ == "__main__":
    if len(sys.argv) < 2:
        sys.argv.extend(["worker", "-l", "info"])
    app.main()

@thomas-chauvet
Copy link
Contributor

I edited the previous message by using list instead of set to avoid issue with unhashable type such as dict.

window_data.setdefault(processed_window[0], []).extend(self.data.get(processed_window, []))

@codecov-commenter
Copy link

codecov-commenter commented Nov 22, 2022

Codecov Report

Attention: Patch coverage is 70.00000% with 3 lines in your changes are missing coverage. Please review.

Project coverage is 93.69%. Comparing base (ed85356) to head (c103bce).

Files Patch % Lines
faust/tables/base.py 70.00% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #412      +/-   ##
==========================================
- Coverage   93.71%   93.69%   -0.03%     
==========================================
  Files         102      102              
  Lines       11110    11117       +7     
  Branches     1545     1550       +5     
==========================================
+ Hits        10412    10416       +4     
- Misses        607      609       +2     
- Partials       91       92       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@wbarnha
Copy link
Member Author

wbarnha commented Nov 22, 2022

I substituted your timed producers with:

@app.timer(1)
async def produce_a():
    await source.send(value=RawModel(key="a", value=random.randint(1, 10), date=time.time()), key="a")


@app.timer(1)
async def produce_b():
    await source.send(value=RawModel(key="b", value=random.randint(1, 10), date=time.time()), key="b")

And I see this:

[2022-11-22 15:55:27,366] [22633] [WARNING] Window 1669150505.0 - 1669150514.9 has 3 events 
[2022-11-22 15:55:27,366] [22633] [WARNING] 1669150512.0197647: ('a', 8) 
[2022-11-22 15:55:27,366] [22633] [WARNING] 1669150513.0207028: ('a', 2) 
[2022-11-22 15:55:27,366] [22633] [WARNING] 1669150514.0224366: ('a', 1) 
[2022-11-22 15:55:27,366] [22633] [WARNING] Window 1669150505.0 - 1669150514.9 has 3 events 
[2022-11-22 15:55:27,366] [22633] [WARNING] 1669150512.0200157: ('b', 1) 
[2022-11-22 15:55:27,367] [22633] [WARNING] 1669150513.0205472: ('b', 6) 
[2022-11-22 15:55:27,367] [22633] [WARNING] 1669150514.0226176: ('b', 6) 
[2022-11-22 15:55:32,366] [22633] [WARNING] Window 1669150510.0 - 1669150519.9 has 5 events 
[2022-11-22 15:55:32,366] [22633] [WARNING] 1669150515.0222416: ('b', 8) 
[2022-11-22 15:55:32,366] [22633] [WARNING] 1669150516.0247138: ('b', 3) 
[2022-11-22 15:55:32,366] [22633] [WARNING] 1669150517.0247314: ('b', 7) 
[2022-11-22 15:55:32,366] [22633] [WARNING] 1669150518.0267282: ('b', 4) 
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150519.0277123: ('b', 5) 
[2022-11-22 15:55:32,367] [22633] [WARNING] Window 1669150510.0 - 1669150519.9 has 5 events 
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150515.0224657: ('a', 1) 
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150516.024582: ('a', 9) 
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150517.0249085: ('a', 10) 
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150518.026591: ('a', 8) 
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150519.0278525: ('a', 8) 

Could you provide clarification on what you meant by for the window (t1, t2) I will end up with data from (t1 + offset, t2 + offset)?

@thomas-chauvet
Copy link
Contributor

thomas-chauvet commented Nov 23, 2022

Intro

I added some prints in the window_processor to make it clearer in term of datetime timestamp.

Note: First of all, if we use a WINDOW size of 10 we should have 10 elements per window. It doesn't seem to be the case in your example but I don't know which window size you are using.

Example

Then, I will beased my explanation on this print (see below for the code generating these logs):

[2022-11-23 08:39:40,256] [46850] [WARNING] Window 2022-11-23 08:39:15 - 2022-11-23 08:39:24 
[2022-11-23 08:39:40,257] [46850] [WARNING] Window 1669189155.0 - 1669189164.9 has 10 events 
[2022-11-23 08:39:40,257] [46850] [WARNING] {'date': 1669189160.294888, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:20'} 
[2022-11-23 08:39:40,257] [46850] [WARNING] {'date': 1669189161.302301, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:21'} 
[2022-11-23 08:39:40,257] [46850] [WARNING] {'date': 1669189162.297276, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:22'} 
[2022-11-23 08:39:40,257] [46850] [WARNING] {'date': 1669189163.30537, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:23'} 
[2022-11-23 08:39:40,257] [46850] [WARNING] {'date': 1669189164.300895, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:24'} 
[2022-11-23 08:39:40,258] [46850] [WARNING] {'date': 1669189165.310261, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:25'} 
[2022-11-23 08:39:40,258] [46850] [WARNING] {'date': 1669189166.3034098, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:26'} 
[2022-11-23 08:39:40,258] [46850] [WARNING] {'date': 1669189167.312484, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:27'} 
[2022-11-23 08:39:40,258] [46850] [WARNING] {'date': 1669189168.305968, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:28'} 
[2022-11-23 08:39:40,258] [46850] [WARNING] {'date': 1669189169.316384, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:29'} 

We have:

  • window size: 10 seconds
  • window step: 5 seconds
  • window expires: 10 + 5 = 15 seconds

Issue: elements in window do not have the same datetime

In the example we see that we are processing window from 2022-11-23 08:39:15 to 2022-11-23 08:39:24.

However, we have element containing data from 2022-11-23 08:39:20 to 2022-11-23 08:39:29. Elements' timestamp correspond exactly to the window's timestamp + window_step -> we have a shift of window_step.

Note that it implies that WINDOW_EXPIRES must be greater than window STEP. If it not the case the window will miss some elements.

Code

import sys
import time
import types
from datetime import datetime, timedelta
from heapq import heappop
from typing import cast

import faust
from faust.types.windows import WindowT


async def _custom_del_old_keys(self) -> None:
    window = cast(WindowT, self.window)
    assert window
    for partition, timestamps in self._partition_timestamps.items():
        while timestamps and window.stale(timestamps[0], time.time()):
            timestamp = heappop(timestamps)
            triggered_windows = [
                self._partition_timestamp_keys.get((partition, window_range[1]))
                for window_range in self._window_ranges(timestamp)
            ]
            keys_to_remove = self._partition_timestamp_keys.pop(
                (partition, timestamp), None
            )
            if keys_to_remove:
                # TODO: need refactoring with dict comprehension?
                window_data = {}
                for windows in triggered_windows:
                    if windows:
                        for processed_window in windows:
                            # we use set to avoid duplicate element in window's data
                            # window[0] is the window's key
                            # it is not related to window's timestamp
                            # windows are in format:
                            # (key, (window_start, window_end))
                            window_data.setdefault(processed_window[0], []).extend(
                                self.data.get(processed_window, [])
                            )

                for key_to_remove in keys_to_remove:
                    value = self.data.pop(key_to_remove, None)
                    if key_to_remove[1][0] > self.last_closed_window:
                        await self.on_window_close(
                            key_to_remove,
                            window_data[key_to_remove[0]]
                            if key_to_remove[0] in window_data
                            else value,
                        )
                self.last_closed_window = max(
                    self.last_closed_window,
                    max(key[1][0] for key in keys_to_remove),
                )


class RawModel(faust.Record):
    key: str
    date: datetime
    value: float


TOPIC = "raw-event"
TABLE = "hopping_table"
KAFKA = "kafka://localhost:29092"
CLEANUP_INTERVAL = 1
WINDOW = 10
STEP = 5
WINDOW_EXPIRES = WINDOW + 5
PARTITIONS = 1

app = faust.App(
    "windowed-hopping-3",
    broker=KAFKA,
    topic_partitions=PARTITIONS,
    topic_disable_leader=True,
)

app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel)


def window_processor(key, events):
    """
    The real window is in fact the window_start + step to window_end + step
    """
    window_start = key[1][0]
    window_end = key[1][1]
    window_start_dt = datetime.fromtimestamp(window_start).strftime("%Y-%m-%d %H:%M:%S")
    window_end_dt = datetime.fromtimestamp(window_end).strftime("%Y-%m-%d %H:%M:%S")
    count = len(events)

    print("-" * 80)
    print(f"Window {window_start_dt} - {window_end_dt}")
    print(f"Window {window_start} - {window_end} has {count} events")
    for event in events:
        event["datetime"] = datetime.fromtimestamp(event["date"]).strftime(
            "%Y-%m-%d %H:%M:%S"
        )
        print(event)
    print("-" * 80)


table = app.Table(
    TABLE,
    default=list,
    partitions=PARTITIONS,
    on_window_close=window_processor,
)

table._del_old_keys = types.MethodType(_custom_del_old_keys, table)

hopping_table = table.hopping(
    WINDOW, STEP, expires=timedelta(seconds=WINDOW_EXPIRES)
).relative_to_field(RawModel.date)


@app.agent(source)
async def print_windowed_events(stream):
    event: RawModel
    async for event in stream:
        value_list = hopping_table[event.key].value()
        value_list.append({"date": event.date, "key": event.key, "value": event.value})
        hopping_table[event.key] = value_list


@app.timer(1)
async def produce_a():
    await source.send(value=RawModel(key="a", value=1, date=time.time()), key="a")


@app.timer(1)
async def produce_b():
    await source.send(value=RawModel(key="b", value=999, date=time.time()), key="b")


if __name__ == "__main__":
    if len(sys.argv) < 2:
        sys.argv.extend(["worker", "-l", "info"])
    app.main()

@fonty422
Copy link
Collaborator

fonty422 commented Apr 4, 2023

Just wondering whether there's a way to have this data stick around after a worker running it has died? The issue I'm finding is that the agent/worker that runs this on restarting or after a rebalance has no historic window information, it's simply gone. Even if the most recent window should still be around (that is, not closed), there appears to be no data available from it.
The same appears true of tumbling tables - like they're held only in memory and not persisted at all.

@wbarnha
Copy link
Member Author

wbarnha commented Apr 7, 2023

The same appears true of tumbling tables - like they're held only in memory and not persisted at all.

Correct, there's nothing in our RocksDB driver that makes it persist. Admittedly, I'm not an expert on the tumbling window scenario, so my progress on it has been delayed.

@wbarnha wbarnha requested a review from patkivikram June 20, 2023 00:37
@wbarnha wbarnha marked this pull request as ready for review June 20, 2023 00:37
@wbarnha
Copy link
Member Author

wbarnha commented Jun 20, 2023

I wish I remember why I shelved this. I think there's something I forgot to test, but I would like to revisit this PR.

@mert-kirpici
Copy link

hi, quite recently i started using faust for a project i am working on and stumbled upon the issue with hopping windows not behaving as they should.

I would really like to see this PR get merged. I was wondering this is still in progress or if there's anything to be done to land this change. I am not very familiar with the codebase but I can take a crack at if there's something missing.

@wbarnha
Copy link
Member Author

wbarnha commented Mar 29, 2024

hi, quite recently i started using faust for a project i am working on and stumbled upon the issue with hopping windows not behaving as they should.

I would really like to see this PR get merged. I was wondering this is still in progress or if there's anything to be done to land this change. I am not very familiar with the codebase but I can take a crack at if there's something missing.

Hey there, thanks for the note. I'll revisit this! I left it open because I wanted to review the changes further before merging in case something got broken by them.

@wbarnha
Copy link
Member Author

wbarnha commented Mar 29, 2024

Okay, I think I'm satisfied now. I added a sanity test for when there are no window ranges and it seems to be having the same behavior as well.

@wbarnha wbarnha self-assigned this Mar 29, 2024
@wbarnha wbarnha merged commit adcbc81 into faust-streaming:master Mar 30, 2024
19 of 23 checks passed
@wbarnha wbarnha deleted the doncat-tables-fix branch March 30, 2024 00:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

HoppingWindow
6 participants