Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51097] [SS] Re-introduce RocksDB state store's last uploaded snapshot version instance metrics #50195

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2226,6 +2226,19 @@ object SQLConf {
.intConf
.createWithDefault(10)

val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT =
buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport")
.internal()
.doc(
"Number of state store instance metrics included in streaming query progress messages " +
"per stateful operator. Instance metrics are selected based on metric-specific ordering " +
"to minimize noise in the progress report."
)
.version("4.0.0")
.intConf
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
.createWithDefault(5)

val STATE_STORE_FORMAT_VALIDATION_ENABLED =
buildConf("spark.sql.streaming.stateStore.formatValidation.enabled")
.internal()
Expand Down Expand Up @@ -5769,6 +5782,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def numStateStoreMaintenanceThreads: Int = getConf(NUM_STATE_STORE_MAINTENANCE_THREADS)

def numStateStoreInstanceMetricsToReport: Int =
getConf(STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)

def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)

def stateStoreFormatValidationEnabled: Boolean = getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ case class StreamingSymmetricHashJoinExec(

override def shortName: String = "symmetricHashJoin"

private val stateStoreNames =
override val stateStoreNames: Seq[String] =
SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)

override def operatorStateMetadata(
Expand Down Expand Up @@ -527,9 +527,8 @@ case class StreamingSymmetricHashJoinExec(
(leftSideJoiner.numUpdatedStateRows + rightSideJoiner.numUpdatedStateRows)
numTotalStateRows += combinedMetrics.numKeys
stateMemory += combinedMetrics.memoryUsedBytes
combinedMetrics.customMetrics.foreach { case (metric, value) =>
longMetric(metric.name) += value
}
setStoreCustomMetrics(combinedMetrics.customMetrics)
setStoreInstanceMetrics(combinedMetrics.instanceMetrics)
}

val stateStoreNames = SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import java.util.Set
import java.util.UUID
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
import javax.annotation.concurrent.GuardedBy

import scala.collection.{mutable, Map}
Expand Down Expand Up @@ -147,6 +147,10 @@ class RocksDB(
private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing
@volatile protected var loadedVersion: Long = -1L // -1 = nothing valid is loaded

// Can be updated by whichever thread uploaded a snapshot, which could be either task,
// maintenance, or both. -1 represents no version has ever been uploaded.
protected val lastUploadedSnapshotVersion: AtomicLong = new AtomicLong(-1L)

// variables to manage checkpoint ID. Once a checkpointing finishes, it needs to return
// `lastCommittedStateStoreCkptId` as the committed checkpointID, as well as
// `lastCommitBasedStateStoreCkptId` as the checkpontID of the previous version that is based on.
Expand Down Expand Up @@ -1297,6 +1301,7 @@ class RocksDB(
bytesCopied = fileManagerMetrics.bytesCopied,
filesCopied = fileManagerMetrics.filesCopied,
filesReused = fileManagerMetrics.filesReused,
lastUploadedSnapshotVersion = lastUploadedSnapshotVersion.get(),
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed,
nativeOpsMetrics = nativeOpsMetrics)
}
Expand Down Expand Up @@ -1465,6 +1470,7 @@ class RocksDB(
log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " +
log"time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms. " +
log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
lastUploadedSnapshotVersion.set(snapshot.version)
} finally {
snapshot.close()
}
Expand Down Expand Up @@ -1916,7 +1922,8 @@ case class RocksDBMetrics(
bytesCopied: Long,
filesReused: Long,
zipFileBytesUncompressed: Option[Long],
nativeOpsMetrics: Map[String, Long]) {
nativeOpsMetrics: Map[String, Long],
lastUploadedSnapshotVersion: Long) {
def json: String = Serialization.write(this)(RocksDBMetrics.format)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,21 @@ private[sql] class RocksDBStateStoreProvider
) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes =>
Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> bytes)).getOrElse(Map())

val stateStoreInstanceMetrics = Map[StateStoreInstanceMetric, Long](
CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED
.withNewId(id.partitionId, id.storeName) -> rocksDBMetrics.lastUploadedSnapshotVersion
)

StateStoreMetrics(
rocksDBMetrics.numUncommittedKeys,
rocksDBMetrics.totalMemUsageBytes,
stateStoreCustomMetrics)
stateStoreCustomMetrics,
stateStoreInstanceMetrics
)
} else {
logInfo(log"Failed to collect metrics for store_id=${MDC(STATE_STORE_ID, id)} " +
log"and version=${MDC(VERSION_NUM, version)}")
StateStoreMetrics(0, 0, Map.empty)
StateStoreMetrics(0, 0, Map.empty, Map.empty)
}
}

Expand Down Expand Up @@ -497,6 +504,8 @@ private[sql] class RocksDBStateStoreProvider

override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = ALL_CUSTOM_METRICS

override def supportedInstanceMetrics: Seq[StateStoreInstanceMetric] = ALL_INSTANCE_METRICS

private[state] def latestVersion: Long = rocksDB.getLatestVersion()

/** Internal fields and methods */
Expand Down Expand Up @@ -888,6 +897,10 @@ object RocksDBStateStoreProvider {
CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES,
CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES_KEYS,
CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES)

val CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED = StateStoreSnapshotLastUploadInstanceMetric()

val ALL_INSTANCE_METRICS = Seq(CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED)
}

/** [[StateStoreChangeDataReader]] implementation for [[RocksDBStateStoreProvider]] */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,17 @@ class WrappedReadStateStore(store: StateStore) extends ReadStateStore {
* @param memoryUsedBytes Memory used by the state store
* @param customMetrics Custom implementation-specific metrics
* The metrics reported through this must have the same `name` as those
* reported by `StateStoreProvider.customMetrics`.
* reported by `StateStoreProvider.supportedCustomMetrics`.
* @param instanceMetrics Custom implementation-specific metrics that are specific to state stores
* The metrics reported through this must have the same `name` as those
* reported by `StateStoreProvider.supportedInstanceMetrics`,
* including partition id and store name.
*/
case class StateStoreMetrics(
numKeys: Long,
memoryUsedBytes: Long,
customMetrics: Map[StateStoreCustomMetric, Long])
customMetrics: Map[StateStoreCustomMetric, Long],
instanceMetrics: Map[StateStoreInstanceMetric, Long] = Map.empty)

/**
* State store checkpoint information, used to pass checkpointing information from executors
Expand Down Expand Up @@ -284,7 +289,8 @@ object StateStoreMetrics {
StateStoreMetrics(
allMetrics.map(_.numKeys).sum,
allMetrics.map(_.memoryUsedBytes).sum,
combinedCustomMetrics)
combinedCustomMetrics,
allMetrics.flatMap(_.instanceMetrics).toMap)
}
}

Expand Down Expand Up @@ -321,6 +327,86 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}

trait StateStoreInstanceMetric {
def metricPrefix: String
def descPrefix: String
def partitionId: Option[Int]
def storeName: String
def initValue: Long

def createSQLMetric(sparkContext: SparkContext): SQLMetric

/**
* Defines how instance metrics are selected for progress reporting.
* Metrics are sorted by value using this ordering, and only the first N metrics are displayed.
* For example, the highest N metrics by value should use Ordering.Long.reverse.
*/
def ordering: Ordering[Long]

/** Should this instance metric be reported if it is unchanged from its initial value */
def ignoreIfUnchanged: Boolean

/**
* Defines how to merge metric values from different executors for the same state store
* instance in situations like speculative execution or provider unloading. In most cases,
* the original metric value is at its initial value.
*/
def combine(originalMetric: SQLMetric, value: Long): Long

def name: String = {
assert(partitionId.isDefined, "Partition ID must be defined for instance metric name")
s"$metricPrefix.partition_${partitionId.get}_$storeName"
}

def desc: String = {
assert(partitionId.isDefined, "Partition ID must be defined for instance metric description")
s"$descPrefix (partitionId = ${partitionId.get}, storeName = $storeName)"
}

def withNewId(partitionId: Int, storeName: String): StateStoreInstanceMetric
}

case class StateStoreSnapshotLastUploadInstanceMetric(
partitionId: Option[Int] = None,
storeName: String = StateStoreId.DEFAULT_STORE_NAME)
extends StateStoreInstanceMetric {

override def metricPrefix: String = "SnapshotLastUploaded"

override def descPrefix: String = {
"The last uploaded version of the snapshot for a specific state store instance"
}

override def initValue: Long = -1L

override def createSQLMetric(sparkContext: SparkContext): SQLMetric = {
SQLMetrics.createSizeMetric(sparkContext, desc, initValue)
}

override def ordering: Ordering[Long] = Ordering.Long

override def ignoreIfUnchanged: Boolean = false

override def combine(originalMetric: SQLMetric, value: Long): Long = {
// Check for cases where the initial value is less than 0, forcing metric.value to
// convert it to 0. Since the last uploaded snapshot version can have an initial
// value of -1, we need special handling to avoid turning the -1 into a 0.
if (originalMetric.isZero) {
value
} else {
// Use max to grab the most recent snapshot version across all executors
// of the same store instance
Math.max(originalMetric.value, value)
}
}

override def withNewId(
partitionId: Int,
storeName: String): StateStoreSnapshotLastUploadInstanceMetric = {
copy(partitionId = Some(partitionId), storeName = storeName)
}
}

sealed trait KeyStateEncoderSpec {
def keySchema: StructType
def jsonValue: JValue
Expand Down Expand Up @@ -495,9 +581,16 @@ trait StateStoreProvider {
/**
* Optional custom metrics that the implementation may want to report.
* @note The StateStore objects created by this provider must report the same custom metrics
* (specifically, same names) through `StateStore.metrics`.
* (specifically, same names) through `StateStore.metrics.customMetrics`.
*/
def supportedCustomMetrics: Seq[StateStoreCustomMetric] = Nil

/**
* Optional custom state store instance metrics that the implementation may want to report.
* @note The StateStore objects created by this provider must report the same instance metrics
* (specifically, same names) through `StateStore.metrics.instanceMetrics`.
*/
def supportedInstanceMetrics: Seq[StateStoreInstanceMetric] = Seq.empty
}

object StateStoreProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,9 @@ class SymmetricHashJoinStateManager(
keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes,
keyWithIndexToValueMetrics.customMetrics.map {
case (metric, value) => (metric.withNewDesc(desc = newDesc(metric.desc)), value)
}
},
// We want to collect instance metrics from both state stores
keyWithIndexToValueMetrics.instanceMetrics ++ keyToNumValuesMetrics.instanceMetrics
)
}

Expand Down
Loading