diff --git a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py index 46aad4b6bc60d..384920f03f1a2 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py @@ -213,6 +213,10 @@ def test_transform_with_state_in_pandas_query_restarts(self): q.awaitTermination(10) self.assertTrue(q.exception() is None) + # Verify custom metrics. + self.assertTrue(q.lastProgress.stateOperators[0].customMetrics["numValueStateVars"] > 0) + self.assertTrue(q.lastProgress.stateOperators[0].customMetrics["numDeletedStateVars"] > 0) + q.stop() self._prepare_test_resource2(input_path) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala index fda3d27e5eaa0..7dd4d4647eeba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.execution.{BinaryExecNode, CoGroupedIterator, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.python.PandasGroupUtils.{executePython, groupAndProject, resolveArgOffsets} -import org.apache.spark.sql.execution.streaming.{StatefulOperatorPartitioning, StatefulOperatorStateInfo, StatefulProcessorHandleImpl, StateStoreWriter, WatermarkSupport} +import org.apache.spark.sql.execution.streaming.{StatefulOperatorCustomMetric, StatefulOperatorCustomSumMetric, StatefulOperatorPartitioning, StatefulOperatorStateInfo, StatefulProcessorHandleImpl, StateStoreWriter, WatermarkSupport} import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.StateStoreAwareZipPartitionsHelper import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateSchemaValidationResult, StateStore, StateStoreConf, StateStoreId, StateStoreOps, StateStoreProviderId} import org.apache.spark.sql.streaming.{OutputMode, TimeMode} @@ -126,6 +126,29 @@ case class TransformWithStateInPandasExec( List.empty } + override def customStatefulOperatorMetrics: Seq[StatefulOperatorCustomMetric] = { + Seq( + // metrics around state variables + StatefulOperatorCustomSumMetric("numValueStateVars", "Number of value state variables"), + StatefulOperatorCustomSumMetric("numListStateVars", "Number of list state variables"), + StatefulOperatorCustomSumMetric("numMapStateVars", "Number of map state variables"), + StatefulOperatorCustomSumMetric("numDeletedStateVars", "Number of deleted state variables"), + // metrics around timers + StatefulOperatorCustomSumMetric("numRegisteredTimers", "Number of registered timers"), + StatefulOperatorCustomSumMetric("numDeletedTimers", "Number of deleted timers"), + StatefulOperatorCustomSumMetric("numExpiredTimers", "Number of expired timers"), + // metrics around TTL + StatefulOperatorCustomSumMetric("numValueStateWithTTLVars", + "Number of value state variables with TTL"), + StatefulOperatorCustomSumMetric("numListStateWithTTLVars", + "Number of list state variables with TTL"), + StatefulOperatorCustomSumMetric("numMapStateWithTTLVars", + "Number of map state variables with TTL"), + StatefulOperatorCustomSumMetric("numValuesRemovedDueToTTLExpiry", + "Number of values removed due to TTL expiry") + ) + } + /** * Produces the result of the query as an `RDD[InternalRow]` */ @@ -247,6 +270,7 @@ case class TransformWithStateInPandasExec( // by the upstream (consumer) operators in addition to the processing in this operator. allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - updatesStartTimeNs) commitTimeMs += timeTakenMs { + processorHandle.doTtlCleanup() store.commit() } setStoreMetrics(store)