title | date | tags | hidden | ||
---|---|---|---|---|---|
Spark Strategy:DataSource |
2018-08-12 15:32:25 -0700 |
|
true |
DataFrameReader#format("json").load(paths : _*)
返回一个 DataFrame, 我们来看看背后发生了什么事情.
首先 load
会进入到:
DataFrameReader#
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
resolveRelation 会生成一个 Resolved BaseRelation , 如果你看执行计划, 会发现 table scan 那里的执行树节点就是 Relation.
JsonDataSource#
def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = {
val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions)
val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0))
JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String)
}
SparkPlan#
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
QueryPlanner#
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...
// Collect physical plan candidates.
val candidates = strategies.iterator.flatMap(_(plan))
...
}
/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
* by user specified columns.
*
* At a high level planning occurs in several phases:
* - Split filters by when they need to be evaluated.
* - Prune the schema of the data requested based on any projections present. Today this pruning
* is only done on top level columns, but formats should support pruning of nested columns as
* well.
* - Construct a reader function by passing filters and the schema into the FileFormat.
* - Using a partition pruning predicates, enumerate the list of files that should be read.
* - Split the files into tasks and construct a FileScanRDD.
* - Add any projection or filters that must be evaluated after the scan.
*
* Files are assigned into tasks using the following algorithm:
* - If the table is bucketed, group files by bucket id into the correct number of partitions.
* - If the table is not bucketed or bucketing is turned off:
* - If any file is larger than the threshold, split it into pieces based on that threshold
* - Sort the files by decreasing file size.
* - Assign the ordered files to buckets using the following algorithm. If the current partition
* is under the threshold with the addition of the next file, add it. If not, open a new bucket
* and add it. Proceed to the next file.
*/
object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters,
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
// - bucket keys only - optionally used to prune files to read
// - keys stored in the data only - optionally used to skip groups of data in files
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)
// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
val partitionColumns =
l.resolve(
fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
val dataColumns =
l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
// Partition keys are not available in the statistics of the files.
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
val filterAttributes = AttributeSet(afterScanFilters)
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
val requiredAttributes = AttributeSet(requiredExpressions)
val readDataColumns =
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val outputSchema = readDataColumns.toStructType
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
val outputAttributes = readDataColumns ++ partitionColumns
val scan =
FileSourceScanExec(
fsRelation,
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
dataFilters,
table.map(_.identifier))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
val withProjections = if (projects == withFilter.output) {
withFilter
} else {
execution.ProjectExec(projects, withFilter)
}
withProjections :: Nil
case _ => Nil
}
}
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
/**
- Returns the result of this query as an RDD[InternalRow] by delegating to
doExecute
after preparations. - Concrete implementations of SparkPlan should override
doExecute
. */ final def execute(): RDD[InternalRow] = executeQuery {
doExecute()
} org.apache.spark.sql.execution.WholeStageCodegenExec#doExecute org.apache.spark.sql.execution.WholeStageCodegenExec#doCodeGen org.apache.spark.sql.execution.CodegenSupport#produce org.apache.spark.sql.execution.CodegenSupport#doProduce
SparkPlan : base class for physical operators, The naming convention is that physical operators end with "Exec" suffix, e.g. ProjectExec.
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {
/**
* Execute a query after preparing the query and adding query plan information to created RDDs
* for visualization.
*/
protected final def executeQuery[T](query: => T): T = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
waitForSubqueries()
query
}
}
/**
* Returns the result of this query as an RDD[InternalRow] by delegating to `doExecute` after
* preparations.
*
* Concrete implementations of SparkPlan should override `doExecute`.
*/
final def execute(): RDD[InternalRow] = executeQuery {
doExecute()
}
/**
* Overridden by concrete implementations of SparkPlan.
* Produces the result of the query as an RDD[InternalRow]
*/
protected def doExecute(): RDD[InternalRow]
/**
* Prepare a SparkPlan for execution. It's idempotent.
*/
final def prepare(): Unit = {
// doPrepare() may depend on it's children, we should call prepare() on all the children first.
children.foreach(_.prepare())
synchronized {
if (!prepared) {
prepareSubqueries()
doPrepare()
prepared = true
}
}
}
/**
* Overridden by concrete implementations of SparkPlan. It is guaranteed to run before any
* `execute` of SparkPlan. This is helpful if we want to set up some state before executing the
* query, e.g., `BroadcastHashJoin` uses it to broadcast asynchronously.
*
* Note: the prepare method has already walked down the tree, so the implementation doesn't need
* to call children's prepare methods.
*
* This will only be called once, protected by `this`.
*/
protected def doPrepare(): Unit = {}
}
WholeStageCodegenExec(this).execute()
注意这行.`
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
requiredSchema: StructType,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {
protected override def doExecute(): RDD[InternalRow] = {
if (supportsBatch) {
// in the case of fallback, this batched scan should never fail because of:
// 1) only primitive types are supported
// 2) the number of columns should be smaller than spark.sql.codegen.maxFields
WholeStageCodegenExec(this).execute()
} else {
val unsafeRows = {
val scan = inputRDD
if (needsUnsafeRowConversion) {
scan.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}
} else {
scan
}
}
val numOutputRows = longMetric("numOutputRows")
unsafeRows.map { r =>
numOutputRows += 1
r
}
}
}
}
trait CodegenSupport extends SparkPlan {
/**
* Returns Java source code to process the rows from input RDD.
*/
final def produce(ctx: CodegenContext, parent: CodegenSupport): String = executeQuery {
this.parent = parent
ctx.freshNamePrefix = variablePrefix
s"""
|${ctx.registerComment(s"PRODUCE: ${this.simpleString}")}
|${doProduce(ctx)}
""".stripMargin
}
/**
* Generate the Java source code to process, should be overridden by subclass to support codegen.
*
* doProduce() usually generate the framework, for example, aggregation could generate this:
*
* if (!initialized) {
* # create a hash map, then build the aggregation hash map
* # call child.produce()
* initialized = true;
* }
* while (hashmap.hasNext()) {
* row = hashmap.next();
* # build the aggregation results
* # create variables for results
* # call consume(), which will call parent.doConsume()
* if (shouldStop()) return;
* }
*/
protected def doProduce(ctx: CodegenContext): String
/**
* Consume the generated columns or row from current SparkPlan, call its parent's `doConsume()`.
*/
final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = {
val inputVars =
if (row != null) {
ctx.currentVars = null
ctx.INPUT_ROW = row
output.zipWithIndex.map { case (attr, i) =>
BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
}
} else {
assert(outputVars != null)
assert(outputVars.length == output.length)
// outputVars will be used to generate the code for UnsafeRow, so we should copy them
outputVars.map(_.copy())
}
val rowVar = if (row != null) {
ExprCode("", "false", row)
} else {
if (outputVars.nonEmpty) {
val colExprs = output.zipWithIndex.map { case (attr, i) =>
BoundReference(i, attr.dataType, attr.nullable)
}
val evaluateInputs = evaluateVariables(outputVars)
// generate the code to create a UnsafeRow
ctx.INPUT_ROW = row
ctx.currentVars = outputVars
val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false)
val code = s"""
|$evaluateInputs
|${ev.code.trim}
""".stripMargin.trim
ExprCode(code, "false", ev.value)
} else {
// There is no columns
ExprCode("", "false", "unsafeRow")
}
}
ctx.freshNamePrefix = parent.variablePrefix
val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)
s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
|${parent.doConsume(ctx, inputVars, rowVar)}
""".stripMargin
}
/**
* Generate the Java source code to process the rows from child SparkPlan.
*
* This should be override by subclass to support codegen.
*
* For example, Filter will generate the code like this:
*
* # code to evaluate the predicate expression, result is isNull1 and value2
* if (isNull1 || !value2) continue;
* # call consume(), which will call parent.doConsume()
*
* Note: A plan can either consume the rows as UnsafeRow (row), or a list of variables (input).
*/
def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
throw new UnsupportedOperationException
}
}
/**
* WholeStageCodegen compile a subtree of plans that support codegen together into single Java
* function.
*
* Here is the call graph of to generate Java source (plan A support codegen, but plan B does not):
*
* WholeStageCodegen Plan A FakeInput Plan B
* =========================================================================
*
* -> execute()
* |
* doExecute() ---------> inputRDDs() -------> inputRDDs() ------> execute()
* |
* +-----------------> produce()
* |
* doProduce() -------> produce()
* |
* doProduce()
* |
* doConsume() <--------- consume()
* |
* doConsume() <-------- consume()
*
* SparkPlan A should override doProduce() and doConsume().
*
* doCodeGen() will create a CodeGenContext, which will hold a list of variables for input,
* used to generated code for BoundReference.
*/