diff --git a/conf/base/catalog.yml b/conf/base/catalog.yml index d937843..b96c459 100644 --- a/conf/base/catalog.yml +++ b/conf/base/catalog.yml @@ -8,7 +8,6 @@ example_creditcard_data: streaming_creditcard_data: type: kedro_streaming.io.SparkStreamingDataSet - filepath: ignore file_format: kafka load_args: kafka.bootstrap.servers: "localhost:19092" @@ -23,11 +22,9 @@ example_model: streaming_predictions: type: kedro_streaming.io.SparkStreamingDataSet - filepath: ignore file_format: console # streaming_predictions: # type: kedro_streaming.io.SparkStreamingDataSet -# filepath: ignore # file_format: kafka # save_args: # output_mode: append diff --git a/src/kedro_streaming/io/spark_streaming_dataset.py b/src/kedro_streaming/io/spark_streaming_dataset.py index a2a75b3..a90361c 100644 --- a/src/kedro_streaming/io/spark_streaming_dataset.py +++ b/src/kedro_streaming/io/spark_streaming_dataset.py @@ -188,20 +188,20 @@ class SparkStreamingDataSet(AbstractVersionedDataSet): def __init__( # pylint: disable=too-many-arguments self, - filepath: str, + filepath: str = "", file_format: str = "parquet", load_args: Dict[str, Any] = None, save_args: Dict[str, Any] = None, version: Version = None, credentials: Dict[str, Any] = None, ) -> None: - """Creates a new instance of ``SparkDataSet``. + """Creates a new instance of ``SparkStreamingDataSet``. Args: filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks and working with data written to mount path points, specify ``filepath``s for (versioned) ``SparkDataSet``s - starting with ``/dbfs/mnt``. + starting with ``/dbfs/mnt``. Required for file sources/sinks. file_format: File format used during load and save operations. These are formats supported by the running SparkContext include parquet, csv. For a list of supported @@ -310,9 +310,9 @@ def _load(self) -> DataFrame: def _save(self, data: DataFrame) -> None: # save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_save_path())) - data.writeStream.format("console").outputMode( - "append" - ).start().awaitTermination() + data.writeStream.start( + format=self._file_format, **self._save_args + ).awaitTermination() def _exists(self) -> bool: # TODO(deepyaman): Check that the stream exists, done for files.