博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark修炼之道(高级篇)——Spark源代码阅读:第十二节 Spark SQL 处理流程分析...
阅读量:5998 次
发布时间:2019-06-20

本文共 5859 字,大约阅读时间需要 19 分钟。

作者:周志湖

以下的代码演示了通过Case Class进行表Schema定义的样例:

// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._// Define the schema using a case class.// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interface.case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")// The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by field index:teenagers.map(t => "Name: " + t(0)).collect().foreach(println)// or by field name:teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)// Map("name" -> "Justin", "age" -> 19)

(1)sql方法返回DataFrame

def sql(sqlText: String): DataFrame = {    DataFrame(this, parseSql(sqlText))  }

当中parseSql(sqlText)方法生成对应的LogicalPlan得到,该方法源代码例如以下:

//依据传入的sql语句,生成LogicalPlanprotected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)

ddlParser对象定义例如以下:

protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))

(2)然后调用DataFrame的apply方法

private[sql] object DataFrame {  def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {    new DataFrame(sqlContext, logicalPlan)  }}

能够看到,apply方法參数有两个,各自是SQLContext和LogicalPlan,调用的是DataFrame的构造方法,详细源代码例如以下:

//DataFrame构造方法。该构造方法会自己主动对LogicalPlan进行分析,然后返回QueryExecution对象def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = {    this(sqlContext, {      val qe = sqlContext.executePlan(logicalPlan)      //推断是否已经创建。假设是则抛异常      if (sqlContext.conf.dataFrameEagerAnalysis) {        qe.assertAnalyzed()  // This should force analysis and throw errors if there are any      }      qe    })  }

(3)val qe = sqlContext.executePlan(logicalPlan) 返回QueryExecution, sqlContext.executePlan方法源代码例如以下:

protected[sql] def executePlan(plan: LogicalPlan) =    new sparkexecution.QueryExecution(this, plan)

QueryExecution类中表达了Spark运行SQL的主要工作流程,详细例如以下

class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
@VisibleForTesting def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed) lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical) lazy val withCachedData: LogicalPlan = { assertAnalyzed() sqlContext.cacheManager.useCachedData(analyzed) } lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData) // TODO: Don't just pick the first one... lazy val sparkPlan: SparkPlan = { SparkPlan.currentContext.set(sqlContext) sqlContext.planner.plan(optimizedPlan).next() } // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ //调用toRDD方法运行任务将结果转换为RDD lazy val toRdd: RDD[InternalRow] = executedPlan.execute() protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } def simpleString: String = { s"""== Physical Plan == |${stringOrError(executedPlan)} """.stripMargin.trim } override def toString: String = { def output = analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") s"""== Parsed Logical Plan == |${stringOrError(logical)} |== Analyzed Logical Plan == |${stringOrError(output)} |${stringOrError(analyzed)} |== Optimized Logical Plan == |${stringOrError(optimizedPlan)} |== Physical Plan == |${stringOrError(executedPlan)} |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} """.stripMargin.trim }}

能够看到,SQL的运行流程为

1.Parsed Logical Plan:LogicalPlan
2.Analyzed Logical Plan:
lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
3.Optimized Logical Plan:lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)
4. Physical Plan:lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)

能够调用results.queryExecution方法查看,代码例如以下:

scala> results.queryExecutionres1: org.apache.spark.sql.SQLContext#QueryExecution === Parsed Logical Plan =='Project [unresolvedalias('name)] 'UnresolvedRelation [people], None== Analyzed Logical Plan ==name: stringProject [name#0] Subquery people  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at createDataFrame at 
:47== Optimized Logical Plan ==Project [name#0] LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at createDataFrame at
:47== Physical Plan ==TungstenProject [name#0] Scan PhysicalRDD[name#0,age#1]Code Generation: true

(4) 然后调用DataFrame的主构造器完毕DataFrame的构造

class DataFrame private[sql](    @transient val sqlContext: SQLContext,    @DeveloperApi @transient val queryExecution: QueryExecution) extends Serializable

(5)

当调用DataFrame的collect等方法时,便会触发运行executedPlan

def collect(): Array[Row] = withNewExecutionId {    queryExecution.executedPlan.executeCollect()  }

比如:

scala> results.collectres6: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])

总体流程图例如以下:

这里写图片描写叙述

你可能感兴趣的文章
centos修改主机名
查看>>
LVS集群的基础概念篇
查看>>
网络知识汇总(1)-朗文和牛津英语词典网址
查看>>
选择排序(C语言实现) 分类: 数据结构 2015-...
查看>>
Quartz_1_简单编程式任务调度使用(SimpleTrigger)
查看>>
web api 初体验 解决js调用跨域问题
查看>>
centos 安装docker
查看>>
互联网架构的三板斧
查看>>
阿里巴巴MySQL DBA面试题答案[转]
查看>>
JS乘法口诀表(一行代码)
查看>>
系统级性能分析工具perf的介绍与使用
查看>>
spring remoting源码分析--Hessian分析
查看>>
phpMyAdmim和Yii 连接Mysql报错。
查看>>
shell语法简单介绍
查看>>
MyEclipse 6.5 代码自动提示功能配置教程
查看>>
Java程序员面试失败的5大原因
查看>>
我认识的python(5)
查看>>
Promise实现
查看>>
报表性能优化
查看>>
js设计模式--迭代器模式
查看>>