Skip to content

Conversation

heyihong
Copy link
Contributor

@heyihong heyihong commented Sep 4, 2025

What changes were proposed in this pull request?

This PR fixes a critical issue in the protobuf conversion of observed metrics in Spark Connect, specifically when dealing with complex data types like structs, arrays, and maps. The main changes include:

  1. Modified Observation class to store Row objects instead of Map[String, Any]: Changed the internal promise type from Promise[Map[String, Any]] to Promise[Row] to preserve type information during protobuf serialization/deserialization.
  2. Enhanced protobuf conversion for complex types:
    • Added proper handling for struct types by creating GenericRowWithSchema objects instead of tuples
    • Added support for map type conversion in LiteralValueProtoConverter
    • Improved data type inference with a new getDataType method that properly handles all literal types
  3. Fixed observed metrics: Modified the observed metrics processing to include data type information in the protobuf conversion, ensuring that complex types are properly serialized and deserialized.

Why are the changes needed?

The previous implementation had several issues:

  1. Data type loss: Observed metrics were losing their original data types during Protobuf conversion, causing errors
  2. Struct handling problems: The conversion logic didn't properly handle Row objects and struct types

Does this PR introduce any user-facing change?

Yes, this PR fixes a bug that was preventing users from successfully using observed metrics with complex data types (structs, arrays, maps) in Spark Connect. Users can now:

  • Use struct() expressions in observed metrics and receive properly typed GenericRowWithSchema objects
  • Use array() expressions in observed metrics and receive properly typed arrays
  • Use map() expressions in observed metrics and receive properly typed maps

Previously, the code below would fail.

val observation = Observation("struct")
spark
  .range(10)
  .observe(observation, struct(count(lit(1)).as("rows"), max("id").as("maxid")).as("struct"))
  .collect()
observation.get
// Below is the error message:
"""
org.apache.spark.SparkUnsupportedOperationException: literal [10,9] not supported (yet).
org.apache.spark.sql.connect.common.LiteralValueProtoConverter$.toLiteralProtoBuilder(LiteralValueProtoConverter.scala:104)
org.apache.spark.sql.connect.common.LiteralValueProtoConverter$.toLiteralProto(LiteralValueProtoConverter.scala:203)
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution$.$anonfun$createObservedMetricsResponse$2(SparkConnectPlanExecution.scala:571)
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution$.$anonfun$createObservedMetricsResponse$2$adapted(SparkConnectPlanExecution.scala:570)
"""

How was this patch tested?

build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite -- -z SPARK-53490"
build/sbt "connect/testOnly *LiteralExpressionProtoConverterSuite"

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Cursor 1.5.9

@heyihong heyihong changed the title [SPARK-53490] Fix Protobuf conversion in observed metrics [SPARK-53490][CONNECT] Fix Protobuf conversion in observed metrics Sep 4, 2025
@heyihong heyihong changed the title [SPARK-53490][CONNECT] Fix Protobuf conversion in observed metrics [SPARK-53490][CONNECT][SQL] Fix Protobuf conversion in observed metrics Sep 4, 2025
@heyihong
Copy link
Contributor Author

heyihong commented Sep 4, 2025

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
What's the plan for Python client?

@heyihong
Copy link
Contributor Author

heyihong commented Sep 10, 2025

LGTM. What's the plan for Python client?

I am not sure if classic/connect mode support complex data types in PySpark. I will follow up on the Python client in this ticket.

@zhengruifeng
Copy link
Contributor

merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants