Skip to content

Conversation

jayantdb
Copy link
Contributor

@jayantdb jayantdb commented Sep 4, 2025

What changes were proposed in this pull request?

This PR fixes an issue where inputRowsPerSecond and processedRowsPerSecond in streaming progress metrics JSON
were displayed in scientific notation (e.g., 1.9871777605776876E8). The fix uses safe Decimal casting
to ensure values are displayed in a more human-readable format.

Results

Before change

{
  "id" : "9b512179-ea36-4b98-9d79-049d13813894",
  "runId" : "f85e2894-9582-493d-9b94-ce03e5490241",
  "name" : "TestFormatting",
  "timestamp" : "2025-09-04T10:57:02.897Z",
  "batchId" : 0,
  "batchDuration" : 1410,
  "numInputRows" : 900000,
  "inputRowsPerSecond" : 6.923076923076923E7,
  "processedRowsPerSecond" : 638297.8723404256,
  "durationMs" : {
    "addBatch" : 1101,
    "commitOffsets" : 157,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 3,
    "triggerExecution" : 1410,
    "walCommit" : 149
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "MemoryStream[value#133]",
    "startOffset" : null,
    "endOffset" : 0,
    "latestOffset" : null,
    "numInputRows" : 900000,
    "inputRowsPerSecond" : 6.923076923076923E7,
    "processedRowsPerSecond" : 638297.8723404256
  } ],
  "sink" : {
    "description" : "MemorySink",
    "numOutputRows" : 900000
  }
} 

After changes

{
  "id" : "03497c93-7ab7-4e14-ba5f-dadbfc8a4bf6",
  "runId" : "3933cdde-f99d-4a29-8bb8-d13bbb5df425",
  "name" : "TestFormatting",
  "timestamp" : "2025-09-04T15:50:45.500Z",
  "batchId" : 0,
  "batchDuration" : 1444,
  "numInputRows" : 900000,
  "inputRowsPerSecond" : 69230769.2,
  "processedRowsPerSecond" : 623268.7,
  "durationMs" : {
    "addBatch" : 1147,
    "commitOffsets" : 152,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 3,
    "triggerExecution" : 1444,
    "walCommit" : 142
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "MemoryStream[value#133]",
    "startOffset" : null,
    "endOffset" : 0,
    "latestOffset" : null,
    "numInputRows" : 900000,
    "inputRowsPerSecond" : 69230769.2,
    "processedRowsPerSecond" : 623268.7
  } ],
  "sink" : {
    "description" : "MemorySink",
    "numOutputRows" : 900000
  }
}

Why are the changes needed?

Improves the readability of Spark Structured Streaming progress metrics JSON.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Run this Maven test:

./build/mvn -pl sql/core,sql/api \
-am test \
-DwildcardSuites=org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite \
-DwildcardTestName="SPARK-53491"

Results:

Run completed in 10 seconds, 680 milliseconds.
Total number of tests run: 12
Suites: completed 2, aborted 0
Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

Was this patch authored or co-authored using generative AI tooling?

No

…d processedRowsPerSecond in StreamProgressMetrics json
@jayantdb jayantdb changed the title [SPARK-53491][SS] Fix exponential formatting of inputRowsPerSecond an… [SPARK-53491][SS] Fix exponential formatting of inputRowsPerSecond and processedRowsPerSecond in progress metrics JSON Sep 4, 2025
@jayantdb
Copy link
Contributor Author

jayantdb commented Sep 4, 2025

@anishshri-db could you please review this PR? Thanks!

@@ -400,6 +400,35 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
assert(data(0).getAs[Timestamp](0).equals(validValue))
}

test("SPARK-53491: `inputRowsPerSecond` and `processedRowsPerSecond` " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what is the reason to use "`"?


print(progress)

assert(!(progress \ "inputRowsPerSecond").values.toString.contains("E"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: will it be better to use matchers instead of assert?

val df = inputData.toDF()
val query = df.writeStream
.format("memory")
.queryName("TestFormatting")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets use a different name here ?


val progress = query.lastProgress.jsonValue

(progress \ "inputRowsPerSecond").values.toString should not include "E"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this doing exactly ? maybe add some comments ?

@anishshri-db
Copy link
Contributor

@jayantdb - please look into CI failures here - https://github.com/jayantdb/spark/actions/runs/17472105263/job/49622983276 ?

@anishshri-db
Copy link
Contributor

Can you also paste the new output for the progress metrics with your change ?

@jayantdb
Copy link
Contributor Author

jayantdb commented Sep 5, 2025

Can you also paste the new output for the progress metrics with your change ?

@jayantdb - please look into CI failures here - https://github.com/jayantdb/spark/actions/runs/17472105263/job/49622983276 ?

@anishshri-db The CI pipeline is failing due to Scala linter with this message:

Scalastyle checks passed.
The scalafmt check failed on sql/connect or sql/connect at following occurrences:

org.apache.maven.plugin.MojoExecutionException: Scalafmt: Unformatted files found
Error:  Failed to execute goal org.antipathy:mvn-scalafmt_2.13:1.1.1713302731.c3d0074:format (default-cli) on project spark-sql-api_2.13: Error formatting Scala files: Scalafmt: Unformatted files found -> [Help 1]

Before submitting your change, please make sure to format your code using the following command:
./build/mvn scalafmt:format -Dscalafmt.skip=false -Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl sql/api -pl sql/connect/common -pl sql/connect/server -pl sql/connect/shims -pl sql/connect/client/jvm
Error: Process completed with exit code 1.

The reason seems to be due to the formatting in sql/connect packages.

Upon running the following check, I can see 1000+ files are marked as unformatted:

 ./build/mvn scalafmt:format \       
  -Dscalafmt.skip=false \
  -Dscalafmt.validateOnly=true \
  -Dscalafmt.changedOnly=false \
  -pl sql/core \

For example:

[INFO] - Requires formatting: PrunedScanSuite.scala
[INFO] - Requires formatting: ResolvedDataSourceSuite.scala
[INFO] - Requires formatting: DisableUnnecessaryBucketedScanSuite.scala
[INFO] - Requires formatting: SaveLoadSuite.scala
[INFO] - Requires formatting: DDLSourceLoadSuite.scala
[INFO] - Requires formatting: fakeExternalSources.scala
[INFO] - Requires formatting: InsertSuite.scala
[INFO] - Requires formatting: PathOptionSuite.scala
[INFO] - Requires formatting: TableScanSuite.scala
[INFO] - Requires formatting: BucketedWriteSuite.scala
[INFO] - Requires formatting: PartitionedWriteSuite.scala
[INFO] - Requires formatting: FiltersSuite.scala
[INFO] - Requires formatting: DataSourceAnalysisSuite.scala
[....]
[INFO] - Formatted: TPCBase.scala
[....]
[INFO] - Requires formatting: VariantShreddingSuite.scala
[INFO] - Requires formatting: DataFrameTableValuedFunctionsSuite.scala
[INFO] - Requires formatting: IntegratedUDFTestUtils.scala
[INFO] - Requires formatting: DeprecatedAPISuite.scala
[INFO] - Requires formatting: ReplaceIntegerLiteralsWithOrdinalsSqlSuite.scala
[INFO] - Requires formatting: SubquerySuite.scala
[INFO] - Requires formatting: DataFrameAggregateSuite.scala
[INFO] - Requires formatting: TPCHBase.scala
[ERROR] 
org.apache.maven.plugin.MojoExecutionException: Scalafmt: Unformatted files found
    at org.antipathy.mvn_scalafmt.FormatMojo.execute (FormatMojo.java:91)
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:126)

I didn't touch any of these thousands of files, so I am unsure if I should do anything or not.

Kindly check and advise.

@jayantdb
Copy link
Contributor Author

jayantdb commented Sep 5, 2025

Can you also paste the new output for the progress metrics with your change ?

@anishshri-db , you can find the output of my code change at the comment in the JIRA: https://issues.apache.org/jira/browse/SPARK-53491

Pasting the output here as well for your reference:

{
  "id" : "03497c93-7ab7-4e14-ba5f-dadbfc8a4bf6",
  "runId" : "3933cdde-f99d-4a29-8bb8-d13bbb5df425",
  "name" : "TestFormatting",
  "timestamp" : "2025-09-04T15:50:45.500Z",
  "batchId" : 0,
  "batchDuration" : 1444,
  "numInputRows" : 900000,
  "inputRowsPerSecond" : 69230769.2,
  "processedRowsPerSecond" : 623268.7,
  "durationMs" : {
    "addBatch" : 1147,
    "commitOffsets" : 152,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 3,
    "triggerExecution" : 1444,
    "walCommit" : 142
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "MemoryStream[value#133]",
    "startOffset" : null,
    "endOffset" : 0,
    "latestOffset" : null,
    "numInputRows" : 900000,
    "inputRowsPerSecond" : 69230769.2,
    "processedRowsPerSecond" : 623268.7
  } ],
  "sink" : {
    "description" : "MemorySink",
    "numOutputRows" : 900000
  }
}

def safeDecimalToJValue(value: Double): JValue = {
if (value.isNaN || value.isInfinity) {
JNothing
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: while not enforced, single line} else { is more commonly used in Spark, AFAIK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - can u pls fix the formatting here

/** Convert BigDecimal to JValue while handling empty or infinite values */
def safeDecimalToJValue(value: Double): JValue = {
if (value.isNaN || value.isInfinity) {
JNothing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a corresponding test case for isNaN?

@jayantdb
Copy link
Contributor Author

jayantdb commented Sep 9, 2025

Can you also paste the new output for the progress metrics with your change ?

@jayantdb - please look into CI failures here - https://github.com/jayantdb/spark/actions/runs/17472105263/job/49622983276 ?

@anishshri-db The CI pipeline is failing due to Scala linter with this message:

Scalastyle checks passed.
The scalafmt check failed on sql/connect or sql/connect at following occurrences:

org.apache.maven.plugin.MojoExecutionException: Scalafmt: Unformatted files found
Error:  Failed to execute goal org.antipathy:mvn-scalafmt_2.13:1.1.1713302731.c3d0074:format (default-cli) on project spark-sql-api_2.13: Error formatting Scala files: Scalafmt: Unformatted files found -> [Help 1]

Before submitting your change, please make sure to format your code using the following command:
./build/mvn scalafmt:format -Dscalafmt.skip=false -Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl sql/api -pl sql/connect/common -pl sql/connect/server -pl sql/connect/shims -pl sql/connect/client/jvm
Error: Process completed with exit code 1.

The reason seems to be due to the formatting in sql/connect packages.

Upon running the following check, I can see 1000+ files are marked as unformatted:

 ./build/mvn scalafmt:format \       
  -Dscalafmt.skip=false \
  -Dscalafmt.validateOnly=true \
  -Dscalafmt.changedOnly=false \
  -pl sql/core \

For example:

[INFO] - Requires formatting: PrunedScanSuite.scala
[INFO] - Requires formatting: ResolvedDataSourceSuite.scala
[INFO] - Requires formatting: DisableUnnecessaryBucketedScanSuite.scala
[INFO] - Requires formatting: SaveLoadSuite.scala
[INFO] - Requires formatting: DDLSourceLoadSuite.scala
[INFO] - Requires formatting: fakeExternalSources.scala
[INFO] - Requires formatting: InsertSuite.scala
[INFO] - Requires formatting: PathOptionSuite.scala
[INFO] - Requires formatting: TableScanSuite.scala
[INFO] - Requires formatting: BucketedWriteSuite.scala
[INFO] - Requires formatting: PartitionedWriteSuite.scala
[INFO] - Requires formatting: FiltersSuite.scala
[INFO] - Requires formatting: DataSourceAnalysisSuite.scala
[....]
[INFO] - Formatted: TPCBase.scala
[....]
[INFO] - Requires formatting: VariantShreddingSuite.scala
[INFO] - Requires formatting: DataFrameTableValuedFunctionsSuite.scala
[INFO] - Requires formatting: IntegratedUDFTestUtils.scala
[INFO] - Requires formatting: DeprecatedAPISuite.scala
[INFO] - Requires formatting: ReplaceIntegerLiteralsWithOrdinalsSqlSuite.scala
[INFO] - Requires formatting: SubquerySuite.scala
[INFO] - Requires formatting: DataFrameAggregateSuite.scala
[INFO] - Requires formatting: TPCHBase.scala
[ERROR] 
org.apache.maven.plugin.MojoExecutionException: Scalafmt: Unformatted files found
    at org.antipathy.mvn_scalafmt.FormatMojo.execute (FormatMojo.java:91)
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:126)

I didn't touch any of these thousands of files, so I am unsure if I should do anything or not.

Kindly check and advise.

@anishshri-db
Update: All tests have now passed in the build.

Copy link
Member

@vrozov vrozov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@anishshri-db
Copy link
Contributor

Would this break parsing for streaming query listeners that might be parsing these values ?

cc - @HeartSaVioR to confirm

@jayantdb
Copy link
Contributor Author

Would this break parsing for streaming query listeners that might be parsing these values?

cc - @HeartSaVioR to confirm

No @anishshri-db . It won't break anything.

I have kept the core implementation of inputRowsPerSecond and processedRowsPerSecond unchanged, and they will still be available as Double type because Double is a standard data type and Decimal is not.

Accessing using something like query.lastProgress.inputRowsPerSecond or query.lastProgress.processedRowsPerSecond will return double-type values in scientific notation. Downstream apps using these individual metrics (mostly through stream listeners) will get the same unchanged behavior, and they will be able to cast the exponential values to Decimal to deal with it.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Could you please still update the PR description to contain the specific event before the fix vs after the fix?

@jayantdb
Copy link
Contributor Author

+1

Could you please still update the PR description to contain the specific event before the fix vs after the fix?

@HeartSaVioR .
Its done. I have added the results in Description.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants