Skip to content

Commit

Permalink
[SPARK-50270][SS][PYTHON] Added custom state metrics for TransformWit…
Browse files Browse the repository at this point in the history
…hStateInPandas

### What changes were proposed in this pull request?

- Added custom state metrics for TransformWithStateInPandas.
- Clean up TTL properly.

### Why are the changes needed?

Bring parity with Scala TransformWithState.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Python unit test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48808 from bogao007/state-metrics.

Authored-by: bogao007 <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
bogao007 authored and HeartSaVioR committed Nov 9, 2024
1 parent 202400a commit 8f5d8d4
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ def test_transform_with_state_in_pandas_query_restarts(self):
q.awaitTermination(10)
self.assertTrue(q.exception() is None)

# Verify custom metrics.
self.assertTrue(q.lastProgress.stateOperators[0].customMetrics["numValueStateVars"] > 0)
self.assertTrue(q.lastProgress.stateOperators[0].customMetrics["numDeletedStateVars"] > 0)

q.stop()

self._prepare_test_resource2(input_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.{BinaryExecNode, CoGroupedIterator, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.python.PandasGroupUtils.{executePython, groupAndProject, resolveArgOffsets}
import org.apache.spark.sql.execution.streaming.{StatefulOperatorPartitioning, StatefulOperatorStateInfo, StatefulProcessorHandleImpl, StateStoreWriter, WatermarkSupport}
import org.apache.spark.sql.execution.streaming.{StatefulOperatorCustomMetric, StatefulOperatorCustomSumMetric, StatefulOperatorPartitioning, StatefulOperatorStateInfo, StatefulProcessorHandleImpl, StateStoreWriter, WatermarkSupport}
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.StateStoreAwareZipPartitionsHelper
import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateSchemaValidationResult, StateStore, StateStoreConf, StateStoreId, StateStoreOps, StateStoreProviderId}
import org.apache.spark.sql.streaming.{OutputMode, TimeMode}
Expand Down Expand Up @@ -126,6 +126,29 @@ case class TransformWithStateInPandasExec(
List.empty
}

override def customStatefulOperatorMetrics: Seq[StatefulOperatorCustomMetric] = {
Seq(
// metrics around state variables
StatefulOperatorCustomSumMetric("numValueStateVars", "Number of value state variables"),
StatefulOperatorCustomSumMetric("numListStateVars", "Number of list state variables"),
StatefulOperatorCustomSumMetric("numMapStateVars", "Number of map state variables"),
StatefulOperatorCustomSumMetric("numDeletedStateVars", "Number of deleted state variables"),
// metrics around timers
StatefulOperatorCustomSumMetric("numRegisteredTimers", "Number of registered timers"),
StatefulOperatorCustomSumMetric("numDeletedTimers", "Number of deleted timers"),
StatefulOperatorCustomSumMetric("numExpiredTimers", "Number of expired timers"),
// metrics around TTL
StatefulOperatorCustomSumMetric("numValueStateWithTTLVars",
"Number of value state variables with TTL"),
StatefulOperatorCustomSumMetric("numListStateWithTTLVars",
"Number of list state variables with TTL"),
StatefulOperatorCustomSumMetric("numMapStateWithTTLVars",
"Number of map state variables with TTL"),
StatefulOperatorCustomSumMetric("numValuesRemovedDueToTTLExpiry",
"Number of values removed due to TTL expiry")
)
}

/**
* Produces the result of the query as an `RDD[InternalRow]`
*/
Expand Down Expand Up @@ -247,6 +270,7 @@ case class TransformWithStateInPandasExec(
// by the upstream (consumer) operators in addition to the processing in this operator.
allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - updatesStartTimeNs)
commitTimeMs += timeTakenMs {
processorHandle.doTtlCleanup()
store.commit()
}
setStoreMetrics(store)
Expand Down

0 comments on commit 8f5d8d4

Please sign in to comment.