Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);同时还依赖Hive Metastore和Hive SerDe(用于兼容现有的各种Hive存储格式)。这一策略导致了两个问题,第一是执行计划优化完全依赖于Hive,不方便添加新的优化策略;二是因为MR是进程级并行,写代码的时候不是很注意线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支(至于为何相关修改没有合并到Hive主线,我也不太清楚)。

Spark SQL解决了这两个问题。第一,Spark SQL在Hive兼容层面仅依赖HQL parser、Hive Metastore和Hive SerDe。也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。执行计划生成和优化都由Catalyst负责。借助Scala的模式匹配等函数式语言特性,利用Catalyst开发执行计划优化策略比Hive要简洁得多。去年Spark summit上Catalyst的作者Michael Armbrust对Catalyst做了一个简要介绍:2013 | Spark Summit(知乎竟然不能自定义链接的文字?)。第二,相对于Shark,由于进一步削减了对Hive的依赖,Spark SQL不再需要自行维护打了patch的Hive分支。Shark后续将全面采用Spark SQL作为引擎,不仅仅是查询优化方面。

此外,除了兼容HQL、加速现有Hive数据的查询分析以外,Spark SQL还支持直接对原生RDD对象进行关系查询。同时,除了HQL以外,Spark SQL还内建了一个精简的SQL parser,以及一套Scala DSL。也就是说,如果只是使用Spark SQL内建的SQL方言或Scala DSL对原生RDD对象进行关系查询,用户在开发Spark应用时完全不需要依赖Hive的任何东西。

能够对原生RDD对象进行关系查询,个人认为大大降低了用户门槛。一方面当然是因为熟悉SQL的人比熟悉Spark API的人多,另一方面是因为Spark SQL之下有Catalyst驱动的查询计划优化引擎。虽然在很多方面Spark的性能完爆Hadoop MapReduce好几条街,但Spark的运行时模型也比MapReduce复杂不少,使得Spark应用的性能调优比较tricky。虽然从代码量上来看,Spark应用往往是对等的MR应用的好几分之一,但裸用Spark API开发高效Spark应用还是需要花些心思的。这就体现出Spark SQL的优势了:即便用户写出的查询不那么高效,Catalyst也可以自动应用一系列常见优化策略。

下面举一个简单的filter pushdown优化的例子,首先是一段用Spark API写成的查询逻辑:

case class Person(name: String, age: Int, salary: Double)

sc.textFile("input.txt")
  .map { line =>
    val Array(name, age, salary) = line.split(",")
    Person(name, age.toInt, salary.toDouble)
  }
  .map { case Person(name, _, salary) =>
    (name, salary)
  }
  .filter(_._2 > 30000)
  .map(_._1)
  .collect()

有经验的程序员会把上述逻辑中的filter挪到第二个map之前,这样第二个map要处理的元素可能就要少很多(假设月薪大于30,000的不多)。这种情况下,只能依靠程序员自身的经验来做优化。如果使用Spark SQL,上面这个例子里的逻辑翻译成HQL相当于:

SELECT name
FROM (
  SELECT name, salary
  FROM people
) t
WHERE t.salary > 30000

遇到这样的查询时,Catalyst会自动应用filter pushdown优化,将上述查询在执行计划层面改写成与下述HQL等价的查询:

SELECT name
FROM (
  SELECT name, salary
  FROM people
  WHERE salary > 30000
)

完整的Spark SQL程序则是:

case class Person(name: String, age: Int, salary: Double)

val people = sc.textFile("input.txt").map {
  val Array(name, age, salary) = line.split(",")
  Person(name, age.toInt, salary.toDouble)
}

people.registerAsTable("people")

val query: RDD[(String, Double)] = sql(
  """
  SELECT name
  FROM (
    SELECT name, salary
    FROM people
  ) t
  WHERE t.salary > 30000
  """)

val result = query.collect()

上面这个例子同时也可以看到,在Spark SQL中可以轻松混用SQL/HQL和Spark的RDD API。比如上面的query变量便是一个包含SQL查询结果的普通RDD对象,可以直接喂入MLlib作为后续机器学习算法的输入,而无须额外的跨系统转储成本。在单一系统内支持多种分析,这也是Spark最大的优势之一。

Spark SQL还很新,目前主要精力集中在性能和稳定性方面,功能方面也还不是非常完善,非常欢迎来自社区的贡献。其实Catalyst/Spark SQL里还有很多有趣的东西,四月份中国Spark技术峰会上会进一步介绍,这里就不多说了,否则到时候就没悬念了 🙂

— 完 —

本文作者:连城

【知乎日报】
你都看到这啦,快来点我嘛 Σ(▼□▼メ)

此问题还有 4 个回答,查看全部。
延伸阅读:
LBS数据库的架构是怎样的?
给游戏公司招数据分析师,要求 SQL, SAS or R,VBA, ODBC, English,这样的人有吗?

分享到