diff --git a/python/pyspark/sql/streaming/readwriter.py b/python/pyspark/sql/streaming/readwriter.py index 34af8cd9b070e..8c70457b426f7 100644 --- a/python/pyspark/sql/streaming/readwriter.py +++ b/python/pyspark/sql/streaming/readwriter.py @@ -1571,12 +1571,14 @@ def foreach(self, f: Union[Callable[[Row], None], "SupportsProcess"]) -> "DataSt def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> "DataStreamWriter": """ Sets the output of the streaming query to be processed using the provided - function. This is supported only the in the micro-batch execution modes (that is, when the - trigger is not continuous). In every micro-batch, the provided function will be called in - every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier. - The batchId can be used deduplicate and transactionally write the output - (that is, the provided Dataset) to external systems. The output DataFrame is guaranteed - to exactly same for the same batchId (assuming all operations are deterministic in the + function. This is supported only in the micro-batch execution modes (that is, when the + trigger is not continuous). In every micro-batch, the provided function will be called + with (i) the output rows as a :class:`DataFrame `, + and (ii) the batch identifier. + + The batchId can be used to deduplicate and transactionally write the resulting + DataFrame to external systems. The output DataFrame is guaranteed + to be exactly same for the same batchId (assuming all operations are deterministic in the query). .. versionadded:: 2.4.0 @@ -1596,14 +1598,17 @@ def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> "DataStreamW >>> df = spark.readStream.format("rate").load() >>> my_value = -1 >>> def func(batch_df, batch_id): + ... from pyspark.sql.functions import lit ... global my_value - ... my_value = 100 - ... batch_df.collect() + ... (batch_df + ... .withColumn("my_value", lit(my_value) + ... .collect()) ... >>> q = df.writeStream.foreachBatch(func).start() >>> time.sleep(3) >>> q.stop() - >>> # if in Spark Connect, my_value = -1, else my_value = 100 + >>> # if in Spark Connect, this will fail, because + >>> # the `func` doesn't have any global `my_value` """ from py4j.java_gateway import java_import from pyspark.java_gateway import ensure_callback_server_started