Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR][DOCS] Update foreachbatch docs #50188

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions python/pyspark/sql/streaming/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1571,12 +1571,14 @@ def foreach(self, f: Union[Callable[[Row], None], "SupportsProcess"]) -> "DataSt
def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> "DataStreamWriter":
"""
Sets the output of the streaming query to be processed using the provided
function. This is supported only the in the micro-batch execution modes (that is, when the
trigger is not continuous). In every micro-batch, the provided function will be called in
every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier.
The batchId can be used deduplicate and transactionally write the output
(that is, the provided Dataset) to external systems. The output DataFrame is guaranteed
to exactly same for the same batchId (assuming all operations are deterministic in the
function. This is supported only in the micro-batch execution modes (that is, when the
trigger is not continuous). In every micro-batch, the provided function will be called
with (i) the output rows as a :class:`DataFrame <pyspark.sql.DataFrame>`,
and (ii) the batch identifier.

The batchId can be used to deduplicate and transactionally write the resulting
DataFrame to external systems. The output DataFrame is guaranteed
to be exactly same for the same batchId (assuming all operations are deterministic in the
query).

.. versionadded:: 2.4.0
Expand All @@ -1596,14 +1598,17 @@ def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> "DataStreamW
>>> df = spark.readStream.format("rate").load()
>>> my_value = -1
>>> def func(batch_df, batch_id):
... from pyspark.sql.functions import lit
... global my_value
... my_value = 100
... batch_df.collect()
... (batch_df
... .withColumn("my_value", lit(my_value)
... .collect())
...
>>> q = df.writeStream.foreachBatch(func).start()
>>> time.sleep(3)
>>> q.stop()
>>> # if in Spark Connect, my_value = -1, else my_value = 100
>>> # if in Spark Connect, this will fail, because
>>> # the `func` doesn't have any global `my_value`
"""
from py4j.java_gateway import java_import
from pyspark.java_gateway import ensure_callback_server_started
Expand Down