From 47067f8e6ba6dcaa9079c8d4026a286899243953 Mon Sep 17 00:00:00 2001 From: jingz-db Date: Tue, 10 Sep 2024 16:12:58 -0700 Subject: [PATCH] need to return iterator & unit tests, mostly done --- python/pyspark/sql/pandas/group_ops.py | 10 +- .../pyspark/sql/streaming/StateMessage_pb2.py | 16 +- .../sql/streaming/StateMessage_pb2.pyi | 16 +- .../sql/streaming/stateful_processor.py | 3 +- .../stateful_processor_api_client.py | 18 +- .../test_pandas_transform_with_state.py | 65 +++- .../execution/streaming/StateMessage.proto | 4 +- .../streaming/state/StateMessage.java | 318 +++++++++--------- ...ransformWithStateInPandasStateServer.scala | 11 +- .../streaming/IncrementalExecution.scala | 4 +- 10 files changed, 256 insertions(+), 209 deletions(-) diff --git a/python/pyspark/sql/pandas/group_ops.py b/python/pyspark/sql/pandas/group_ops.py index d2ec37574da01..4855115afb1f4 100644 --- a/python/pyspark/sql/pandas/group_ops.py +++ b/python/pyspark/sql/pandas/group_ops.py @@ -508,12 +508,14 @@ def transformWithStateUDF( ) # only process initial state if first batch - batch_id = statefulProcessorApiClient.get_batch_id() - if batch_id == 0: + is_first_batch = statefulProcessorApiClient.is_first_batch() + statefulProcessorApiClient.set_implicit_key(key) + if is_first_batch: initial_state = statefulProcessorApiClient.get_initial_state(key) - statefulProcessor.handleInitialState(key, initial_state) + # if user did not provide initial state df, initial_state will be None + if initial_state is not None: + statefulProcessor.handleInitialState(key, initial_state) - statefulProcessorApiClient.set_implicit_key(key) result = statefulProcessor.handleInputRows(key, inputRows) return result diff --git a/python/pyspark/sql/streaming/StateMessage_pb2.py b/python/pyspark/sql/streaming/StateMessage_pb2.py index 0c95b5ec6732f..2d28faf326017 100644 --- a/python/pyspark/sql/streaming/StateMessage_pb2.py +++ b/python/pyspark/sql/streaming/StateMessage_pb2.py @@ -13,15 +13,15 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12StateMessage.proto\x12.org.apache.spark.sql.execution.streaming.state\"\xe9\x02\n\x0cStateRequest\x12\x0f\n\x07version\x18\x01 \x01(\x05\x12\x66\n\x15statefulProcessorCall\x18\x02 \x01(\x0b\x32\x45.org.apache.spark.sql.execution.streaming.state.StatefulProcessorCallH\x00\x12\x64\n\x14stateVariableRequest\x18\x03 \x01(\x0b\x32\x44.org.apache.spark.sql.execution.streaming.state.StateVariableRequestH\x00\x12p\n\x1aimplicitGroupingKeyRequest\x18\x04 \x01(\x0b\x32J.org.apache.spark.sql.execution.streaming.state.ImplicitGroupingKeyRequestH\x00\x42\x08\n\x06method\"H\n\rStateResponse\x12\x12\n\nstatusCode\x18\x01 \x01(\x05\x12\x14\n\x0c\x65rrorMessage\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\x0c\"\xe0\x03\n\x15StatefulProcessorCall\x12X\n\x0esetHandleState\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.SetHandleStateH\x00\x12Y\n\rgetValueState\x18\x02 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x12X\n\x0cgetListState\x18\x03 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x12W\n\x0bgetMapState\x18\x04 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x12U\n\tutilsCall\x18\x05 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.UtilsCallCommandH\x00\x42\x08\n\x06method\"z\n\x14StateVariableRequest\x12X\n\x0evalueStateCall\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.ValueStateCallH\x00\x42\x08\n\x06method\"\xe0\x01\n\x1aImplicitGroupingKeyRequest\x12X\n\x0esetImplicitKey\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.SetImplicitKeyH\x00\x12^\n\x11removeImplicitKey\x18\x02 \x01(\x0b\x32\x41.org.apache.spark.sql.execution.streaming.state.RemoveImplicitKeyH\x00\x42\x08\n\x06method\"5\n\x10StateCallCommand\x12\x11\n\tstateName\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\t\"\xe1\x02\n\x0eValueStateCall\x12\x11\n\tstateName\x18\x01 \x01(\t\x12H\n\x06\x65xists\x18\x02 \x01(\x0b\x32\x36.org.apache.spark.sql.execution.streaming.state.ExistsH\x00\x12\x42\n\x03get\x18\x03 \x01(\x0b\x32\x33.org.apache.spark.sql.execution.streaming.state.GetH\x00\x12\\\n\x10valueStateUpdate\x18\x04 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.ValueStateUpdateH\x00\x12\x46\n\x05\x63lear\x18\x05 \x01(\x0b\x32\x35.org.apache.spark.sql.execution.streaming.state.ClearH\x00\x42\x08\n\x06method\"\x1d\n\x0eSetImplicitKey\x12\x0b\n\x03key\x18\x01 \x01(\x0c\"\x13\n\x11RemoveImplicitKey\"\x08\n\x06\x45xists\"\x05\n\x03Get\"!\n\x10ValueStateUpdate\x12\r\n\x05value\x18\x01 \x01(\x0c\"\x07\n\x05\x43lear\"\\\n\x0eSetHandleState\x12J\n\x05state\x18\x01 \x01(\x0e\x32;.org.apache.spark.sql.execution.streaming.state.HandleState\"\xca\x01\n\x10UtilsCallCommand\x12P\n\ngetBatchId\x18\x01 \x01(\x0b\x32:.org.apache.spark.sql.execution.streaming.state.GetBatchIdH\x00\x12Z\n\x0fgetInitialState\x18\x02 \x01(\x0b\x32?.org.apache.spark.sql.execution.streaming.state.GetInitialStateH\x00\x42\x08\n\x06method\"\x0c\n\nGetBatchId\" \n\x0fGetInitialState\x12\r\n\x05value\x18\x01 \x01(\x0c*K\n\x0bHandleState\x12\x0b\n\x07\x43REATED\x10\x00\x12\x0f\n\x0bINITIALIZED\x10\x01\x12\x12\n\x0e\x44\x41TA_PROCESSED\x10\x02\x12\n\n\x06\x43LOSED\x10\x03\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12StateMessage.proto\x12.org.apache.spark.sql.execution.streaming.state\"\xe9\x02\n\x0cStateRequest\x12\x0f\n\x07version\x18\x01 \x01(\x05\x12\x66\n\x15statefulProcessorCall\x18\x02 \x01(\x0b\x32\x45.org.apache.spark.sql.execution.streaming.state.StatefulProcessorCallH\x00\x12\x64\n\x14stateVariableRequest\x18\x03 \x01(\x0b\x32\x44.org.apache.spark.sql.execution.streaming.state.StateVariableRequestH\x00\x12p\n\x1aimplicitGroupingKeyRequest\x18\x04 \x01(\x0b\x32J.org.apache.spark.sql.execution.streaming.state.ImplicitGroupingKeyRequestH\x00\x42\x08\n\x06method\"H\n\rStateResponse\x12\x12\n\nstatusCode\x18\x01 \x01(\x05\x12\x14\n\x0c\x65rrorMessage\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\x0c\"\xe0\x03\n\x15StatefulProcessorCall\x12X\n\x0esetHandleState\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.SetHandleStateH\x00\x12Y\n\rgetValueState\x18\x02 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x12X\n\x0cgetListState\x18\x03 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x12W\n\x0bgetMapState\x18\x04 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.StateCallCommandH\x00\x12U\n\tutilsCall\x18\x05 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.UtilsCallCommandH\x00\x42\x08\n\x06method\"z\n\x14StateVariableRequest\x12X\n\x0evalueStateCall\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.ValueStateCallH\x00\x42\x08\n\x06method\"\xe0\x01\n\x1aImplicitGroupingKeyRequest\x12X\n\x0esetImplicitKey\x18\x01 \x01(\x0b\x32>.org.apache.spark.sql.execution.streaming.state.SetImplicitKeyH\x00\x12^\n\x11removeImplicitKey\x18\x02 \x01(\x0b\x32\x41.org.apache.spark.sql.execution.streaming.state.RemoveImplicitKeyH\x00\x42\x08\n\x06method\"5\n\x10StateCallCommand\x12\x11\n\tstateName\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x02 \x01(\t\"\xe1\x02\n\x0eValueStateCall\x12\x11\n\tstateName\x18\x01 \x01(\t\x12H\n\x06\x65xists\x18\x02 \x01(\x0b\x32\x36.org.apache.spark.sql.execution.streaming.state.ExistsH\x00\x12\x42\n\x03get\x18\x03 \x01(\x0b\x32\x33.org.apache.spark.sql.execution.streaming.state.GetH\x00\x12\\\n\x10valueStateUpdate\x18\x04 \x01(\x0b\x32@.org.apache.spark.sql.execution.streaming.state.ValueStateUpdateH\x00\x12\x46\n\x05\x63lear\x18\x05 \x01(\x0b\x32\x35.org.apache.spark.sql.execution.streaming.state.ClearH\x00\x42\x08\n\x06method\"\x1d\n\x0eSetImplicitKey\x12\x0b\n\x03key\x18\x01 \x01(\x0c\"\x13\n\x11RemoveImplicitKey\"\x08\n\x06\x45xists\"\x05\n\x03Get\"!\n\x10ValueStateUpdate\x12\r\n\x05value\x18\x01 \x01(\x0c\"\x07\n\x05\x43lear\"\\\n\x0eSetHandleState\x12J\n\x05state\x18\x01 \x01(\x0e\x32;.org.apache.spark.sql.execution.streaming.state.HandleState\"\xce\x01\n\x10UtilsCallCommand\x12T\n\x0cisFirstBatch\x18\x01 \x01(\x0b\x32<.org.apache.spark.sql.execution.streaming.state.IsFirstBatchH\x00\x12Z\n\x0fgetInitialState\x18\x02 \x01(\x0b\x32?.org.apache.spark.sql.execution.streaming.state.GetInitialStateH\x00\x42\x08\n\x06method\"\x0e\n\x0cIsFirstBatch\" \n\x0fGetInitialState\x12\r\n\x05value\x18\x01 \x01(\x0c*K\n\x0bHandleState\x12\x0b\n\x07\x43REATED\x10\x00\x12\x0f\n\x0bINITIALIZED\x10\x01\x12\x12\n\x0e\x44\x41TA_PROCESSED\x10\x02\x12\n\n\x06\x43LOSED\x10\x03\x62\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'StateMessage_pb2', globals()) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _HANDLESTATE._serialized_start=2213 - _HANDLESTATE._serialized_end=2288 + _HANDLESTATE._serialized_start=2219 + _HANDLESTATE._serialized_end=2294 _STATEREQUEST._serialized_start=71 _STATEREQUEST._serialized_end=432 _STATERESPONSE._serialized_start=434 @@ -51,9 +51,9 @@ _SETHANDLESTATE._serialized_start=1866 _SETHANDLESTATE._serialized_end=1958 _UTILSCALLCOMMAND._serialized_start=1961 - _UTILSCALLCOMMAND._serialized_end=2163 - _GETBATCHID._serialized_start=2165 - _GETBATCHID._serialized_end=2177 - _GETINITIALSTATE._serialized_start=2179 - _GETINITIALSTATE._serialized_end=2211 + _UTILSCALLCOMMAND._serialized_end=2167 + _ISFIRSTBATCH._serialized_start=2169 + _ISFIRSTBATCH._serialized_end=2183 + _GETINITIALSTATE._serialized_start=2185 + _GETINITIALSTATE._serialized_end=2217 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/streaming/StateMessage_pb2.pyi b/python/pyspark/sql/streaming/StateMessage_pb2.pyi index 5adc14b670761..504a7a5fcfed4 100644 --- a/python/pyspark/sql/streaming/StateMessage_pb2.pyi +++ b/python/pyspark/sql/streaming/StateMessage_pb2.pyi @@ -21,10 +21,6 @@ class Get(_message.Message): __slots__ = [] def __init__(self) -> None: ... -class GetBatchId(_message.Message): - __slots__ = [] - def __init__(self) -> None: ... - class GetInitialState(_message.Message): __slots__ = ["value"] VALUE_FIELD_NUMBER: ClassVar[int] @@ -39,6 +35,10 @@ class ImplicitGroupingKeyRequest(_message.Message): setImplicitKey: SetImplicitKey def __init__(self, setImplicitKey: Optional[Union[SetImplicitKey, Mapping]] = ..., removeImplicitKey: Optional[Union[RemoveImplicitKey, Mapping]] = ...) -> None: ... +class IsFirstBatch(_message.Message): + __slots__ = [] + def __init__(self) -> None: ... + class RemoveImplicitKey(_message.Message): __slots__ = [] def __init__(self) -> None: ... @@ -106,12 +106,12 @@ class StatefulProcessorCall(_message.Message): def __init__(self, setHandleState: Optional[Union[SetHandleState, Mapping]] = ..., getValueState: Optional[Union[StateCallCommand, Mapping]] = ..., getListState: Optional[Union[StateCallCommand, Mapping]] = ..., getMapState: Optional[Union[StateCallCommand, Mapping]] = ..., utilsCall: Optional[Union[UtilsCallCommand, Mapping]] = ...) -> None: ... class UtilsCallCommand(_message.Message): - __slots__ = ["getBatchId", "getInitialState"] - GETBATCHID_FIELD_NUMBER: ClassVar[int] + __slots__ = ["getInitialState", "isFirstBatch"] GETINITIALSTATE_FIELD_NUMBER: ClassVar[int] - getBatchId: GetBatchId + ISFIRSTBATCH_FIELD_NUMBER: ClassVar[int] getInitialState: GetInitialState - def __init__(self, getBatchId: Optional[Union[GetBatchId, Mapping]] = ..., getInitialState: Optional[Union[GetInitialState, Mapping]] = ...) -> None: ... + isFirstBatch: IsFirstBatch + def __init__(self, isFirstBatch: Optional[Union[IsFirstBatch, Mapping]] = ..., getInitialState: Optional[Union[GetInitialState, Mapping]] = ...) -> None: ... class ValueStateCall(_message.Message): __slots__ = ["clear", "exists", "get", "stateName", "valueStateUpdate"] diff --git a/python/pyspark/sql/streaming/stateful_processor.py b/python/pyspark/sql/streaming/stateful_processor.py index 58cae640e319c..e9ef8d6eb40f8 100644 --- a/python/pyspark/sql/streaming/stateful_processor.py +++ b/python/pyspark/sql/streaming/stateful_processor.py @@ -164,6 +164,7 @@ def handleInitialState( self, key: Any, initialState: "PandasDataFrameLike" ) -> None: """ - Optional to implement + Optional to implement. Will act as no-op if not defined or no initial state input. Function + invoked only once at the first batch. Allow for users to perform initial state processing. """ pass diff --git a/python/pyspark/sql/streaming/stateful_processor_api_client.py b/python/pyspark/sql/streaming/stateful_processor_api_client.py index a5fc0b6139bc3..ef788505be889 100644 --- a/python/pyspark/sql/streaming/stateful_processor_api_client.py +++ b/python/pyspark/sql/streaming/stateful_processor_api_client.py @@ -122,26 +122,24 @@ def get_value_state(self, state_name: str, schema: Union[StructType, str]) -> No # TODO(SPARK-49233): Classify user facing errors. raise PySparkRuntimeError(f"Error initializing value state: " f"{response_message[1]}") - def get_batch_id(self) -> int: + def is_first_batch(self) -> bool: import pyspark.sql.streaming.StateMessage_pb2 as stateMessage - get_batch_id = stateMessage.GetBatchId() - request = stateMessage.UtilsCallCommand(getBatchId=get_batch_id) + is_first_batch = stateMessage.IsFirstBatch() + request = stateMessage.UtilsCallCommand(isFirstBatch=is_first_batch) stateful_processor_call = stateMessage.StatefulProcessorCall(utilsCall=request) message = stateMessage.StateRequest(statefulProcessorCall=stateful_processor_call) self._send_proto_message(message.SerializeToString()) response_message = self._receive_proto_message() status = response_message[0] - if status != 0: + if status == 0: + return True + elif status == 1: + return False + else: # TODO(SPARK-49233): Classify user facing errors. raise PySparkRuntimeError(f"Error getting batch id: " f"{response_message[1]}") - else: - if len(response_message[2]) == 0: - return -1 - # TODO: can we simply parse from utf8 string here? - batch_id = int(response_message[2]) - return batch_id def get_initial_state(self, key: Tuple) -> "PandasDataFrameLike": from pandas import DataFrame diff --git a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py index cec29d084f9a6..833b56bc0fe39 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py @@ -74,6 +74,11 @@ def _prepare_test_resource2(self, input_path): fw.write("1, 246\n") fw.write("1, 6\n") + def _prepare_test_resource3(self, input_path): + with open(input_path + "/text-test2.txt", "w") as fw: + fw.write("3, 12\n") + fw.write("0, 67\n") + def _build_test_df(self, input_path): df = self.spark.readStream.format("text").option("maxFilesPerTrigger", 1).load(input_path) df_split = df.withColumn("split_values", split(df["value"], ",")) @@ -213,12 +218,11 @@ def test_transform_with_state_in_pandas_query_restarts(self): """ def _test_transform_with_state_in_pandas_basic( - self, stateful_processor, check_results, single_batch=False + self, stateful_processor, check_results ): input_path = tempfile.mkdtemp() self._prepare_test_resource1(input_path) - if not single_batch: - self._prepare_test_resource2(input_path) + self._prepare_test_resource3(input_path) df = self._build_test_df(input_path) @@ -229,15 +233,15 @@ def _test_transform_with_state_in_pandas_basic( output_schema = StructType( [ StructField("id", StringType(), True), - StructField("countAsString", StringType(), True), + StructField("value", StringType(), True), ] ) from pyspark.sql import GroupedData - data = [("0", 789), ("1", 987)] + data = [("0", 789), ("3", 987)] initial_state =\ - self.spark.createDataFrame(data, "id string, sth int")\ + self.spark.createDataFrame(data, "id string, initVal int")\ .groupBy("id") q = ( @@ -264,20 +268,58 @@ def _test_transform_with_state_in_pandas_basic( def test_transform_with_state_in_pandas_basic(self): def check_results(batch_df, batch_id): if batch_id == 0: + # for key 0, initial state was processed and it was only processed once; + # for key 1, it did not appear in the initial state df; + # for key 3, it did not appear in the first batch of input keys + # so it won't be emitted assert set(batch_df.sort("id").collect()) == { - Row(id="0", countAsString="2"), - Row(id="1", countAsString="2"), + Row(id="0", value=str(789 + 123 + 46)), + Row(id="1", value=str(146 + 346)), } else: + # for key 0, verify initial state was only processed once in the first batch; + # for key 3, verify init state was now processed assert set(batch_df.sort("id").collect()) == { - Row(id="0", countAsString="3"), - Row(id="1", countAsString="2"), + Row(id="0", value=str(789 + 123 + 46 + 67)), + # Row(id="3", value=str(987 + 12)), + Row(id="3", value=str(12)), } self._test_transform_with_state_in_pandas_basic(SimpleStatefulProcessorWithInitialState(), check_results) class SimpleStatefulProcessorWithInitialState(StatefulProcessor): + + def init(self, handle: StatefulProcessorHandle) -> None: + state_schema = StructType([StructField("value", IntegerType(), True)]) + self.value_state = handle.getValueState("value_state", state_schema) + + def handleInputRows(self, key, rows) -> Iterator[pd.DataFrame]: + exists = self.value_state.exists() + if exists: + value_row = self.value_state.get() + existing_value = value_row[0] + else: + existing_value = 0 + + accumulated_value = existing_value + + for pdf in rows: + value = pdf['temperature'].astype(int).sum() + accumulated_value += value + + self.value_state.update((accumulated_value,)) + + yield pd.DataFrame({"id": key, "value": str(accumulated_value)}) + + def handleInitialState(self, key, initialState) -> None: + initVal = initialState.at[0, "initVal"] + self.value_state.update((initVal,)) + + def close(self) -> None: + pass + +class SimpleStatefulProcessor(StatefulProcessor): dict = {0: {"0": 1, "1": 2}, 1: {"0": 4, "1": 3}} batch_id = 0 @@ -307,9 +349,6 @@ def handleInputRows(self, key, rows) -> Iterator[pd.DataFrame]: self.num_violations_state.update((updated_violations,)) yield pd.DataFrame({"id": key, "countAsString": str(count)}) - def handleInitialState(self, key, initialState) -> None: - raise Exception(f"I am inside handleInitialState, init state: {initialState.get('sth')}") - def close(self) -> None: pass diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/StateMessage.proto b/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/StateMessage.proto index 4c68d9086d664..5042fbb3575f3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/StateMessage.proto +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/StateMessage.proto @@ -105,12 +105,12 @@ message SetHandleState { message UtilsCallCommand { oneof method { - GetBatchId getBatchId = 1; + IsFirstBatch isFirstBatch = 1; GetInitialState getInitialState = 2; } } -message GetBatchId { +message IsFirstBatch { } message GetInitialState { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/state/StateMessage.java b/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/state/StateMessage.java index a2fecbe8c49e2..dcaa0bc67b4f2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/state/StateMessage.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/state/StateMessage.java @@ -10554,19 +10554,19 @@ public interface UtilsCallCommandOrBuilder extends com.google.protobuf.MessageOrBuilder { /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; - * @return Whether the getBatchId field is set. + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; + * @return Whether the isFirstBatch field is set. */ - boolean hasGetBatchId(); + boolean hasIsFirstBatch(); /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; - * @return The getBatchId. + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; + * @return The isFirstBatch. */ - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId getGetBatchId(); + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch getIsFirstBatch(); /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; */ - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchIdOrBuilder getGetBatchIdOrBuilder(); + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatchOrBuilder getIsFirstBatchOrBuilder(); /** * .org.apache.spark.sql.execution.streaming.state.GetInitialState getInitialState = 2; @@ -10630,7 +10630,7 @@ protected java.lang.Object newInstance( public enum MethodCase implements com.google.protobuf.Internal.EnumLite, com.google.protobuf.AbstractMessage.InternalOneOfEnum { - GETBATCHID(1), + ISFIRSTBATCH(1), GETINITIALSTATE(2), METHOD_NOT_SET(0); private final int value; @@ -10649,7 +10649,7 @@ public static MethodCase valueOf(int value) { public static MethodCase forNumber(int value) { switch (value) { - case 1: return GETBATCHID; + case 1: return ISFIRSTBATCH; case 2: return GETINITIALSTATE; case 0: return METHOD_NOT_SET; default: return null; @@ -10666,35 +10666,35 @@ public int getNumber() { methodCase_); } - public static final int GETBATCHID_FIELD_NUMBER = 1; + public static final int ISFIRSTBATCH_FIELD_NUMBER = 1; /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; - * @return Whether the getBatchId field is set. + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; + * @return Whether the isFirstBatch field is set. */ @java.lang.Override - public boolean hasGetBatchId() { + public boolean hasIsFirstBatch() { return methodCase_ == 1; } /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; - * @return The getBatchId. + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; + * @return The isFirstBatch. */ @java.lang.Override - public org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId getGetBatchId() { + public org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch getIsFirstBatch() { if (methodCase_ == 1) { - return (org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId) method_; + return (org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch) method_; } - return org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.getDefaultInstance(); + return org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.getDefaultInstance(); } /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; */ @java.lang.Override - public org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchIdOrBuilder getGetBatchIdOrBuilder() { + public org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatchOrBuilder getIsFirstBatchOrBuilder() { if (methodCase_ == 1) { - return (org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId) method_; + return (org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch) method_; } - return org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.getDefaultInstance(); + return org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.getDefaultInstance(); } public static final int GETINITIALSTATE_FIELD_NUMBER = 2; @@ -10743,7 +10743,7 @@ public final boolean isInitialized() { public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { if (methodCase_ == 1) { - output.writeMessage(1, (org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId) method_); + output.writeMessage(1, (org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch) method_); } if (methodCase_ == 2) { output.writeMessage(2, (org.apache.spark.sql.execution.streaming.state.StateMessage.GetInitialState) method_); @@ -10759,7 +10759,7 @@ public int getSerializedSize() { size = 0; if (methodCase_ == 1) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(1, (org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId) method_); + .computeMessageSize(1, (org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch) method_); } if (methodCase_ == 2) { size += com.google.protobuf.CodedOutputStream @@ -10783,8 +10783,8 @@ public boolean equals(final java.lang.Object obj) { if (!getMethodCase().equals(other.getMethodCase())) return false; switch (methodCase_) { case 1: - if (!getGetBatchId() - .equals(other.getGetBatchId())) return false; + if (!getIsFirstBatch() + .equals(other.getIsFirstBatch())) return false; break; case 2: if (!getGetInitialState() @@ -10806,8 +10806,8 @@ public int hashCode() { hash = (19 * hash) + getDescriptor().hashCode(); switch (methodCase_) { case 1: - hash = (37 * hash) + GETBATCHID_FIELD_NUMBER; - hash = (53 * hash) + getGetBatchId().hashCode(); + hash = (37 * hash) + ISFIRSTBATCH_FIELD_NUMBER; + hash = (53 * hash) + getIsFirstBatch().hashCode(); break; case 2: hash = (37 * hash) + GETINITIALSTATE_FIELD_NUMBER; @@ -10944,8 +10944,8 @@ private Builder( @java.lang.Override public Builder clear() { super.clear(); - if (getBatchIdBuilder_ != null) { - getBatchIdBuilder_.clear(); + if (isFirstBatchBuilder_ != null) { + isFirstBatchBuilder_.clear(); } if (getInitialStateBuilder_ != null) { getInitialStateBuilder_.clear(); @@ -10979,10 +10979,10 @@ public org.apache.spark.sql.execution.streaming.state.StateMessage.UtilsCallComm public org.apache.spark.sql.execution.streaming.state.StateMessage.UtilsCallCommand buildPartial() { org.apache.spark.sql.execution.streaming.state.StateMessage.UtilsCallCommand result = new org.apache.spark.sql.execution.streaming.state.StateMessage.UtilsCallCommand(this); if (methodCase_ == 1) { - if (getBatchIdBuilder_ == null) { + if (isFirstBatchBuilder_ == null) { result.method_ = method_; } else { - result.method_ = getBatchIdBuilder_.build(); + result.method_ = isFirstBatchBuilder_.build(); } } if (methodCase_ == 2) { @@ -11042,8 +11042,8 @@ public Builder mergeFrom(com.google.protobuf.Message other) { public Builder mergeFrom(org.apache.spark.sql.execution.streaming.state.StateMessage.UtilsCallCommand other) { if (other == org.apache.spark.sql.execution.streaming.state.StateMessage.UtilsCallCommand.getDefaultInstance()) return this; switch (other.getMethodCase()) { - case GETBATCHID: { - mergeGetBatchId(other.getGetBatchId()); + case ISFIRSTBATCH: { + mergeIsFirstBatch(other.getIsFirstBatch()); break; } case GETINITIALSTATE: { @@ -11082,7 +11082,7 @@ public Builder mergeFrom( break; case 10: { input.readMessage( - getGetBatchIdFieldBuilder().getBuilder(), + getIsFirstBatchFieldBuilder().getBuilder(), extensionRegistry); methodCase_ = 1; break; @@ -11126,71 +11126,71 @@ public Builder clearMethod() { private com.google.protobuf.SingleFieldBuilderV3< - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId, org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.Builder, org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchIdOrBuilder> getBatchIdBuilder_; + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch, org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.Builder, org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatchOrBuilder> isFirstBatchBuilder_; /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; - * @return Whether the getBatchId field is set. + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; + * @return Whether the isFirstBatch field is set. */ @java.lang.Override - public boolean hasGetBatchId() { + public boolean hasIsFirstBatch() { return methodCase_ == 1; } /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; - * @return The getBatchId. + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; + * @return The isFirstBatch. */ @java.lang.Override - public org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId getGetBatchId() { - if (getBatchIdBuilder_ == null) { + public org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch getIsFirstBatch() { + if (isFirstBatchBuilder_ == null) { if (methodCase_ == 1) { - return (org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId) method_; + return (org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch) method_; } - return org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.getDefaultInstance(); + return org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.getDefaultInstance(); } else { if (methodCase_ == 1) { - return getBatchIdBuilder_.getMessage(); + return isFirstBatchBuilder_.getMessage(); } - return org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.getDefaultInstance(); + return org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.getDefaultInstance(); } } /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; */ - public Builder setGetBatchId(org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId value) { - if (getBatchIdBuilder_ == null) { + public Builder setIsFirstBatch(org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch value) { + if (isFirstBatchBuilder_ == null) { if (value == null) { throw new NullPointerException(); } method_ = value; onChanged(); } else { - getBatchIdBuilder_.setMessage(value); + isFirstBatchBuilder_.setMessage(value); } methodCase_ = 1; return this; } /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; */ - public Builder setGetBatchId( - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.Builder builderForValue) { - if (getBatchIdBuilder_ == null) { + public Builder setIsFirstBatch( + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.Builder builderForValue) { + if (isFirstBatchBuilder_ == null) { method_ = builderForValue.build(); onChanged(); } else { - getBatchIdBuilder_.setMessage(builderForValue.build()); + isFirstBatchBuilder_.setMessage(builderForValue.build()); } methodCase_ = 1; return this; } /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; */ - public Builder mergeGetBatchId(org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId value) { - if (getBatchIdBuilder_ == null) { + public Builder mergeIsFirstBatch(org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch value) { + if (isFirstBatchBuilder_ == null) { if (methodCase_ == 1 && - method_ != org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.getDefaultInstance()) { - method_ = org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.newBuilder((org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId) method_) + method_ != org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.getDefaultInstance()) { + method_ = org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.newBuilder((org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch) method_) .mergeFrom(value).buildPartial(); } else { method_ = value; @@ -11198,19 +11198,19 @@ public Builder mergeGetBatchId(org.apache.spark.sql.execution.streaming.state.St onChanged(); } else { if (methodCase_ == 1) { - getBatchIdBuilder_.mergeFrom(value); + isFirstBatchBuilder_.mergeFrom(value); } else { - getBatchIdBuilder_.setMessage(value); + isFirstBatchBuilder_.setMessage(value); } } methodCase_ = 1; return this; } /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; */ - public Builder clearGetBatchId() { - if (getBatchIdBuilder_ == null) { + public Builder clearIsFirstBatch() { + if (isFirstBatchBuilder_ == null) { if (methodCase_ == 1) { methodCase_ = 0; method_ = null; @@ -11221,50 +11221,50 @@ public Builder clearGetBatchId() { methodCase_ = 0; method_ = null; } - getBatchIdBuilder_.clear(); + isFirstBatchBuilder_.clear(); } return this; } /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; */ - public org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.Builder getGetBatchIdBuilder() { - return getGetBatchIdFieldBuilder().getBuilder(); + public org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.Builder getIsFirstBatchBuilder() { + return getIsFirstBatchFieldBuilder().getBuilder(); } /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; */ @java.lang.Override - public org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchIdOrBuilder getGetBatchIdOrBuilder() { - if ((methodCase_ == 1) && (getBatchIdBuilder_ != null)) { - return getBatchIdBuilder_.getMessageOrBuilder(); + public org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatchOrBuilder getIsFirstBatchOrBuilder() { + if ((methodCase_ == 1) && (isFirstBatchBuilder_ != null)) { + return isFirstBatchBuilder_.getMessageOrBuilder(); } else { if (methodCase_ == 1) { - return (org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId) method_; + return (org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch) method_; } - return org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.getDefaultInstance(); + return org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.getDefaultInstance(); } } /** - * .org.apache.spark.sql.execution.streaming.state.GetBatchId getBatchId = 1; + * .org.apache.spark.sql.execution.streaming.state.IsFirstBatch isFirstBatch = 1; */ private com.google.protobuf.SingleFieldBuilderV3< - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId, org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.Builder, org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchIdOrBuilder> - getGetBatchIdFieldBuilder() { - if (getBatchIdBuilder_ == null) { + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch, org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.Builder, org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatchOrBuilder> + getIsFirstBatchFieldBuilder() { + if (isFirstBatchBuilder_ == null) { if (!(methodCase_ == 1)) { - method_ = org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.getDefaultInstance(); + method_ = org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.getDefaultInstance(); } - getBatchIdBuilder_ = new com.google.protobuf.SingleFieldBuilderV3< - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId, org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.Builder, org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchIdOrBuilder>( - (org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId) method_, + isFirstBatchBuilder_ = new com.google.protobuf.SingleFieldBuilderV3< + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch, org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.Builder, org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatchOrBuilder>( + (org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch) method_, getParentForChildren(), isClean()); method_ = null; } methodCase_ = 1; onChanged();; - return getBatchIdBuilder_; + return isFirstBatchBuilder_; } private com.google.protobuf.SingleFieldBuilderV3< @@ -11472,30 +11472,30 @@ public org.apache.spark.sql.execution.streaming.state.StateMessage.UtilsCallComm } - public interface GetBatchIdOrBuilder extends - // @@protoc_insertion_point(interface_extends:org.apache.spark.sql.execution.streaming.state.GetBatchId) + public interface IsFirstBatchOrBuilder extends + // @@protoc_insertion_point(interface_extends:org.apache.spark.sql.execution.streaming.state.IsFirstBatch) com.google.protobuf.MessageOrBuilder { } /** - * Protobuf type {@code org.apache.spark.sql.execution.streaming.state.GetBatchId} + * Protobuf type {@code org.apache.spark.sql.execution.streaming.state.IsFirstBatch} */ - public static final class GetBatchId extends + public static final class IsFirstBatch extends com.google.protobuf.GeneratedMessageV3 implements - // @@protoc_insertion_point(message_implements:org.apache.spark.sql.execution.streaming.state.GetBatchId) - GetBatchIdOrBuilder { + // @@protoc_insertion_point(message_implements:org.apache.spark.sql.execution.streaming.state.IsFirstBatch) + IsFirstBatchOrBuilder { private static final long serialVersionUID = 0L; - // Use GetBatchId.newBuilder() to construct. - private GetBatchId(com.google.protobuf.GeneratedMessageV3.Builder builder) { + // Use IsFirstBatch.newBuilder() to construct. + private IsFirstBatch(com.google.protobuf.GeneratedMessageV3.Builder builder) { super(builder); } - private GetBatchId() { + private IsFirstBatch() { } @java.lang.Override @SuppressWarnings({"unused"}) protected java.lang.Object newInstance( UnusedPrivateParameter unused) { - return new GetBatchId(); + return new IsFirstBatch(); } @java.lang.Override @@ -11505,15 +11505,15 @@ protected java.lang.Object newInstance( } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return org.apache.spark.sql.execution.streaming.state.StateMessage.internal_static_org_apache_spark_sql_execution_streaming_state_GetBatchId_descriptor; + return org.apache.spark.sql.execution.streaming.state.StateMessage.internal_static_org_apache_spark_sql_execution_streaming_state_IsFirstBatch_descriptor; } @java.lang.Override protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internalGetFieldAccessorTable() { - return org.apache.spark.sql.execution.streaming.state.StateMessage.internal_static_org_apache_spark_sql_execution_streaming_state_GetBatchId_fieldAccessorTable + return org.apache.spark.sql.execution.streaming.state.StateMessage.internal_static_org_apache_spark_sql_execution_streaming_state_IsFirstBatch_fieldAccessorTable .ensureFieldAccessorsInitialized( - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.class, org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.Builder.class); + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.class, org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.Builder.class); } private byte memoizedIsInitialized = -1; @@ -11549,10 +11549,10 @@ public boolean equals(final java.lang.Object obj) { if (obj == this) { return true; } - if (!(obj instanceof org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId)) { + if (!(obj instanceof org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch)) { return super.equals(obj); } - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId other = (org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId) obj; + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch other = (org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch) obj; if (!getUnknownFields().equals(other.getUnknownFields())) return false; return true; @@ -11570,69 +11570,69 @@ public int hashCode() { return hash; } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseFrom( + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseFrom( java.nio.ByteBuffer data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseFrom( + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseFrom( java.nio.ByteBuffer data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseFrom( + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseFrom( + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseFrom(byte[] data) + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseFrom( + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseFrom(java.io.InputStream input) + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseFrom(java.io.InputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseFrom( + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input, extensionRegistry); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseDelimitedFrom(java.io.InputStream input) + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseDelimitedWithIOException(PARSER, input); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseDelimitedFrom( + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseDelimitedWithIOException(PARSER, input, extensionRegistry); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseFrom( + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3 .parseWithIOException(PARSER, input); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId parseFrom( + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -11645,7 +11645,7 @@ public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBat public static Builder newBuilder() { return DEFAULT_INSTANCE.toBuilder(); } - public static Builder newBuilder(org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId prototype) { + public static Builder newBuilder(org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch prototype) { return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); } @java.lang.Override @@ -11661,26 +11661,26 @@ protected Builder newBuilderForType( return builder; } /** - * Protobuf type {@code org.apache.spark.sql.execution.streaming.state.GetBatchId} + * Protobuf type {@code org.apache.spark.sql.execution.streaming.state.IsFirstBatch} */ public static final class Builder extends com.google.protobuf.GeneratedMessageV3.Builder implements - // @@protoc_insertion_point(builder_implements:org.apache.spark.sql.execution.streaming.state.GetBatchId) - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchIdOrBuilder { + // @@protoc_insertion_point(builder_implements:org.apache.spark.sql.execution.streaming.state.IsFirstBatch) + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatchOrBuilder { public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return org.apache.spark.sql.execution.streaming.state.StateMessage.internal_static_org_apache_spark_sql_execution_streaming_state_GetBatchId_descriptor; + return org.apache.spark.sql.execution.streaming.state.StateMessage.internal_static_org_apache_spark_sql_execution_streaming_state_IsFirstBatch_descriptor; } @java.lang.Override protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internalGetFieldAccessorTable() { - return org.apache.spark.sql.execution.streaming.state.StateMessage.internal_static_org_apache_spark_sql_execution_streaming_state_GetBatchId_fieldAccessorTable + return org.apache.spark.sql.execution.streaming.state.StateMessage.internal_static_org_apache_spark_sql_execution_streaming_state_IsFirstBatch_fieldAccessorTable .ensureFieldAccessorsInitialized( - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.class, org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.Builder.class); + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.class, org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.Builder.class); } - // Construct using org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.newBuilder() + // Construct using org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.newBuilder() private Builder() { } @@ -11699,17 +11699,17 @@ public Builder clear() { @java.lang.Override public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return org.apache.spark.sql.execution.streaming.state.StateMessage.internal_static_org_apache_spark_sql_execution_streaming_state_GetBatchId_descriptor; + return org.apache.spark.sql.execution.streaming.state.StateMessage.internal_static_org_apache_spark_sql_execution_streaming_state_IsFirstBatch_descriptor; } @java.lang.Override - public org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId getDefaultInstanceForType() { - return org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.getDefaultInstance(); + public org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch getDefaultInstanceForType() { + return org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.getDefaultInstance(); } @java.lang.Override - public org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId build() { - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId result = buildPartial(); + public org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch build() { + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch result = buildPartial(); if (!result.isInitialized()) { throw newUninitializedMessageException(result); } @@ -11717,8 +11717,8 @@ public org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId bu } @java.lang.Override - public org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId buildPartial() { - org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId result = new org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId(this); + public org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch buildPartial() { + org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch result = new org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch(this); onBuilt(); return result; } @@ -11757,16 +11757,16 @@ public Builder addRepeatedField( } @java.lang.Override public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId) { - return mergeFrom((org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId)other); + if (other instanceof org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch) { + return mergeFrom((org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId other) { - if (other == org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId.getDefaultInstance()) return this; + public Builder mergeFrom(org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch other) { + if (other == org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch.getDefaultInstance()) return this; this.mergeUnknownFields(other.getUnknownFields()); onChanged(); return this; @@ -11821,23 +11821,23 @@ public final Builder mergeUnknownFields( } - // @@protoc_insertion_point(builder_scope:org.apache.spark.sql.execution.streaming.state.GetBatchId) + // @@protoc_insertion_point(builder_scope:org.apache.spark.sql.execution.streaming.state.IsFirstBatch) } - // @@protoc_insertion_point(class_scope:org.apache.spark.sql.execution.streaming.state.GetBatchId) - private static final org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId DEFAULT_INSTANCE; + // @@protoc_insertion_point(class_scope:org.apache.spark.sql.execution.streaming.state.IsFirstBatch) + private static final org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch DEFAULT_INSTANCE; static { - DEFAULT_INSTANCE = new org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId(); + DEFAULT_INSTANCE = new org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch(); } - public static org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId getDefaultInstance() { + public static org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch getDefaultInstance() { return DEFAULT_INSTANCE; } - private static final com.google.protobuf.Parser - PARSER = new com.google.protobuf.AbstractParser() { + private static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { @java.lang.Override - public GetBatchId parsePartialFrom( + public IsFirstBatch parsePartialFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { @@ -11856,17 +11856,17 @@ public GetBatchId parsePartialFrom( } }; - public static com.google.protobuf.Parser parser() { + public static com.google.protobuf.Parser parser() { return PARSER; } @java.lang.Override - public com.google.protobuf.Parser getParserForType() { + public com.google.protobuf.Parser getParserForType() { return PARSER; } @java.lang.Override - public org.apache.spark.sql.execution.streaming.state.StateMessage.GetBatchId getDefaultInstanceForType() { + public org.apache.spark.sql.execution.streaming.state.StateMessage.IsFirstBatch getDefaultInstanceForType() { return DEFAULT_INSTANCE; } @@ -12422,10 +12422,10 @@ public org.apache.spark.sql.execution.streaming.state.StateMessage.GetInitialSta com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internal_static_org_apache_spark_sql_execution_streaming_state_UtilsCallCommand_fieldAccessorTable; private static final com.google.protobuf.Descriptors.Descriptor - internal_static_org_apache_spark_sql_execution_streaming_state_GetBatchId_descriptor; + internal_static_org_apache_spark_sql_execution_streaming_state_IsFirstBatch_descriptor; private static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internal_static_org_apache_spark_sql_execution_streaming_state_GetBatchId_fieldAccessorTable; + internal_static_org_apache_spark_sql_execution_streaming_state_IsFirstBatch_fieldAccessorTable; private static final com.google.protobuf.Descriptors.Descriptor internal_static_org_apache_spark_sql_execution_streaming_state_GetInitialState_descriptor; private static final @@ -12488,16 +12488,16 @@ public org.apache.spark.sql.execution.streaming.state.StateMessage.GetInitialSta "Key\"\010\n\006Exists\"\005\n\003Get\"!\n\020ValueStateUpdate" + "\022\r\n\005value\030\001 \001(\014\"\007\n\005Clear\"\\\n\016SetHandleSta" + "te\022J\n\005state\030\001 \001(\0162;.org.apache.spark.sql" + - ".execution.streaming.state.HandleState\"\312" + - "\001\n\020UtilsCallCommand\022P\n\ngetBatchId\030\001 \001(\0132" + - ":.org.apache.spark.sql.execution.streami" + - "ng.state.GetBatchIdH\000\022Z\n\017getInitialState" + - "\030\002 \001(\0132?.org.apache.spark.sql.execution." + - "streaming.state.GetInitialStateH\000B\010\n\006met" + - "hod\"\014\n\nGetBatchId\" \n\017GetInitialState\022\r\n\005" + - "value\030\001 \001(\014*K\n\013HandleState\022\013\n\007CREATED\020\000\022" + - "\017\n\013INITIALIZED\020\001\022\022\n\016DATA_PROCESSED\020\002\022\n\n\006" + - "CLOSED\020\003b\006proto3" + ".execution.streaming.state.HandleState\"\316" + + "\001\n\020UtilsCallCommand\022T\n\014isFirstBatch\030\001 \001(" + + "\0132<.org.apache.spark.sql.execution.strea" + + "ming.state.IsFirstBatchH\000\022Z\n\017getInitialS" + + "tate\030\002 \001(\0132?.org.apache.spark.sql.execut" + + "ion.streaming.state.GetInitialStateH\000B\010\n" + + "\006method\"\016\n\014IsFirstBatch\" \n\017GetInitialSta" + + "te\022\r\n\005value\030\001 \001(\014*K\n\013HandleState\022\013\n\007CREA" + + "TED\020\000\022\017\n\013INITIALIZED\020\001\022\022\n\016DATA_PROCESSED" + + "\020\002\022\n\n\006CLOSED\020\003b\006proto3" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -12592,12 +12592,12 @@ public org.apache.spark.sql.execution.streaming.state.StateMessage.GetInitialSta internal_static_org_apache_spark_sql_execution_streaming_state_UtilsCallCommand_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_org_apache_spark_sql_execution_streaming_state_UtilsCallCommand_descriptor, - new java.lang.String[] { "GetBatchId", "GetInitialState", "Method", }); - internal_static_org_apache_spark_sql_execution_streaming_state_GetBatchId_descriptor = + new java.lang.String[] { "IsFirstBatch", "GetInitialState", "Method", }); + internal_static_org_apache_spark_sql_execution_streaming_state_IsFirstBatch_descriptor = getDescriptor().getMessageTypes().get(15); - internal_static_org_apache_spark_sql_execution_streaming_state_GetBatchId_fieldAccessorTable = new + internal_static_org_apache_spark_sql_execution_streaming_state_IsFirstBatch_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( - internal_static_org_apache_spark_sql_execution_streaming_state_GetBatchId_descriptor, + internal_static_org_apache_spark_sql_execution_streaming_state_IsFirstBatch_descriptor, new java.lang.String[] { }); internal_static_org_apache_spark_sql_execution_streaming_state_GetInitialState_descriptor = getDescriptor().getMessageTypes().get(16); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala index 98e3498c9e55b..dc14a06d3087a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala @@ -180,9 +180,14 @@ class TransformWithStateInPandasStateServer( private def handleStatefulProcessorUtilRequest(message: UtilsCallCommand): Unit = { message.getMethodCase match { - case UtilsCallCommand.MethodCase.GETBATCHID => - val valueStr = statefulProcessorHandle.getQueryInfo().getBatchId.toString - sendResponse(0, null, ByteString.copyFromUtf8(valueStr)) + case UtilsCallCommand.MethodCase.ISFIRSTBATCH => + if (!hasInitialState) { + // In physical planning, hasInitialState will always be flipped + // if it is not first batch + sendResponse(1) + } else { + sendResponse(0) + } case UtilsCallCommand.MethodCase.GETINITIALSTATE => if (!hasInitialState || initialStateKeyToRowMap.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index b24b23c61f4d3..f7990977a1ddb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -328,11 +328,13 @@ class IncrementalExecution( ) case t: TransformWithStateInPandasExec => + val hasInitialState = (currentBatchId == 0L && t.hasInitialState) t.copy( stateInfo = Some(nextStatefulOperationStateInfo()), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermarkForLateEvents = None, - eventTimeWatermarkForEviction = None + eventTimeWatermarkForEviction = None, + hasInitialState = hasInitialState ) case m: FlatMapGroupsInPandasWithStateExec =>