Skip to content

Commit

Permalink
[SPARK-39910][SQL] Delegate path qualification to filesystem during D…
Browse files Browse the repository at this point in the history
…ataSource file path globbing

### What changes were proposed in this pull request?

In current version `DataSource#checkAndGlobPathIfNecessary` qualifies paths via `Path#makeQualified` and `PartitioningAwareFileIndex` qualifies via `FileSystem#makeQualified`. Most `FileSystem` implementations simply delegate to `Path#makeQualified`, but others, like `HarFileSystem` contain fs-specific logic, that can produce different result. Such inconsistencies can lead to a situation, when spark can't find partitions of the source file, because qualified paths, built by `Path` and `FileSystem` are different. Therefore, for uniformity, the `FileSystem` path qualification should be used in `DataSource#checkAndGlobPathIfNecessary`.

### Why are the changes needed?

Allow users to read files from hadoop archives (.har) using DataFrameReader API

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New tests were added in `DataSourceSuite` and `DataFrameReaderWriterSuite`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43463 from tigrulya-exe/SPARK-39910-use-fs-path-qualification.

Authored-by: Tigran Manasyan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
tigrulya-exe authored and cloud-fan committed Feb 8, 2024
1 parent b345b23 commit b7edc5f
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 1 deletion.
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,4 @@ people.xml
ui-test/package.json
ui-test/package-lock.json
core/src/main/resources/org/apache/spark/ui/static/package.json
.*\.har
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ object DataSource extends Logging {
val qualifiedPaths = pathStrings.map { pathString =>
val path = new Path(pathString)
val fs = path.getFileSystem(hadoopConf)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
fs.makeQualified(path)
}

// Split the paths into glob and non glob paths, because we don't need to do an existence check
Expand Down
2 changes: 2 additions & 0 deletions sql/core/src/test/resources/test-data/test-archive.har/_index
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
%2F dir 1707380620211+493+tigrulya+hadoop 0 0 test.csv
%2Ftest.csv file part-0 0 6 1707380620197+420+tigrulya+hadoop
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
3
0 1948547033 0 119
3 changes: 3 additions & 0 deletions sql/core/src/test/resources/test-data/test-archive.har/part-0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1
2
3
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.PrivateMethodTester
Expand Down Expand Up @@ -214,4 +216,6 @@ class MockFileSystem extends RawLocalFileSystem {
override def globStatus(pathPattern: Path): Array[FileStatus] = {
mockGlobResults.getOrElse(pathPattern, Array())
}

override def getUri: URI = URI.create("mockFs://mockFs/")
}
Original file line number Diff line number Diff line change
Expand Up @@ -1363,4 +1363,12 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
}
}
}

test("SPARK-39910: read files from Hadoop archives") {
val fileSchema = new StructType().add("str", StringType)
val harPath = testFile("test-data/test-archive.har")
.replaceFirst("file:/", "har:/")

testRead(spark.read.schema(fileSchema).csv(s"$harPath/test.csv"), data, fileSchema)
}
}

0 comments on commit b7edc5f

Please sign in to comment.