Spark SQL | Spark,从入门到精通

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你。

 / 发家史 / 

熟悉 Spark SQL 的都知道,Spark SQL 是从 Shark 发展而来。Shark 为了实现 Hive 兼容,在 HQL 方面重用了 Hive 中 HQL 的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业(辅以内存列式存储等各种和 Hive 关系不大的优化);同时还依赖 Hive Metastore 和 Hive SerDe(用于兼容现有的各种 Hive 存储格式)。

Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。执行计划生成和优化都由 Catalyst 负责。借助 Scala 的模式匹配等函数式语言特性,利用 Catalyst 开发执行计划优化策略比 Hive 要简洁得多。

Spark SQL

Spark SQL  提供了多种接口:

  • 纯 Sql 文本;

  • dataset/dataframe api。

当然,相应的,也会有各种客户端:

  • sql 文本,可以用 thriftserver/spark-sql;

  • 编码,Dataframe/dataset/sql。

/ Dataframe/Dataset API 简介 / 

Dataframe/Dataset 也是分布式数据集,但与 RDD 不同的是其带有 schema 信息,类似一张表。

可以用下面一张图详细对比 Dataset/dataframe 和 RDD 的区别:

Dataset 是在 spark1.6 引入的,目的是提供像 RDD 一样的强类型、使用强大的 lambda 函数,同时使用 Spark SQL 的优化执行引擎。到 spark2.0 以后,DataFrame 变成类型为 Row 的 Dataset,即为:

type DataFrame = Dataset[Row]

所以,很多移植 spark1.6 及之前的代码到 spark2+的都会报错误,找不到 dataframe 类。

基本操作

val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”) df.show() import spark.implicits._ df.printSchema() df.select("name").show() df.select($"name", $"age" + 1).show() df.filter($"age" > 21).show() df.groupBy("age").count().show() spark.stop()

分区分桶 排序

分桶排序保存hive表 df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”) 分区以parquet输出到指定目录 df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") 分区分桶保存到hive表 df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")

cube rullup pivot

cube sales.cube("city", "year”).agg(sum("amount")as "amount”) .show() rull up sales.rollup("city", "year”).agg(sum("amount")as "amount”).show() pivot 只能跟在groupby之后 sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show()

 / SQL 编程 / 

Spark SQL 允许用户提交 SQL 文本,支持以下三种手段编写  SQL 文本:

1. spark 代码

2. spark-sql的shell

3. thriftserver

支持 Spark SQL 自身的语法,同时也兼容 HSQL。

1. 编码

要先声明构建 SQLContext 或者 SparkSession,这个是 SparkSQL 的编码入口。早起的版本使用的是 SQLContext 或者 HiveContext,spark2 以后,建议使用的是 SparkSession。

SQLContext

new SQLContext(SparkContext)

HiveContext

new HiveContext(spark.sparkContext)

SparkSession

不使用 hive 元数据:

val spark = SparkSession.builder()  .config(sparkConf) .getOrCreate()

使用 hive 元数据:

val spark = SparkSession.builder()  .config(sparkConf) .enableHiveSupport().getOrCreate()

使用

val df =spark.read.json("examples/src/main/resources/people.json")  df.createOrReplaceTempView("people")  spark.sql("SELECT * FROM people").show()

2. spark-sql 脚本

spark-sql 启动的时候类似于 spark-submit 可以设置部署模式资源等,可以使用

bin/spark-sql –help 查看配置参数。 

需要将 hive-site.xml 放到 ${SPARK_HOME}/conf/ 目录下,然后就可以测试

show tables; select count(*) from student;

3. thriftserver

thriftserver jdbc/odbc 的实现类似于 hive1.2.1 的 hiveserver2,可以使用 spark 的 beeline 命令来测试 jdbc server。

安装部署

/1 开启 hive 的 metastore

bin/hive --service metastore

/2 将配置文件复制到spark/conf/目录下

/3 thriftserver

sbin/start-thriftserver.sh --masteryarn  --deploy-mode client

对于 yarn 只支持 client 模式。

/4 启动 bin/beeline

/5 连接到 thriftserver

!connect jdbc:hive2://localhost:10001

 / 用户自定义函数 / 

1. UDF

定义一个 udf 很简单,例如我们自定义一个求字符串长度的 udf:

val len = udf{(str:String) => str.length} spark.udf.register("len",len) val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json") ds.createOrReplaceTempView("employees") ds.show() spark.sql("select len(name) from employees").show()

2. UserDefinedAggregateFunction

定义一个 UDAF

import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ object MyAverageUDAF extends UserDefinedAggregateFunction {  //Data types of input arguments of this aggregate function  definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil)  //Data types of values in the aggregation buffer  defbufferSchema:StructType = {    StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil)  }  //The data type of the returned value  defdataType:DataType = DoubleType  //Whether this function always returns the same output on the identical input  defdeterministic: Boolean = true  //Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to  // standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides  // the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still  // immutable.  definitialize(buffer:MutableAggregationBuffer): Unit = {    buffer(0) = 0L    buffer(1) = 0L  }  //Updates the given aggregation buffer `buffer` with new input data from `input`  defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={    if(!input.isNullAt(0)) {      buffer(0) = buffer.getLong(0)+ input.getLong(0)      buffer(1) = buffer.getLong(1)+ 1    }  }  // Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1`  defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={    buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0)    buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1)  }  //Calculates the final result  defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1) }

使用 UDAF

val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json") ds.createOrReplaceTempView("employees") ds.show() spark.udf.register("myAverage", MyAverageUDAF) val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show()

3. Aggregator

定义一个 Aggregator

import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.expressions.Aggregator case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverageAggregator extends Aggregator[Employee, Average, Double] {  // A zero value for this aggregation. Should satisfy the property that any b + zero = b  def zero: Average = Average(0L, 0L)  // Combine two values to produce a new value. For performance, the function may modify `buffer`  // and return it instead of constructing a new object  def reduce(buffer: Average, employee: Employee): Average = {    buffer.sum += employee.salary    buffer.count += 1    buffer  }  // Merge two intermediate values  def merge(b1: Average, b2: Average): Average = {    b1.sum += b2.sum    b1.count += b2.count    b1  }  // Transform the output of the reduction  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count  // Specifies the Encoder for the intermediate value type  def bufferEncoder: Encoder[Average] = Encoders.product  // Specifies the Encoder for the final output value type  def outputEncoder: Encoder[Double] = Encoders.scalaDouble }

使用

spark.udf.register("myAverage2", MyAverageAggregator) import spark.implicits._ val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee] ds.show() val averageSalary = MyAverageAggregator.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show()

 / 数据源 / 

1. 通用的 laod/save 函数
可支持多种数据格式:json, parquet, jdbc, orc, libsvm, csv, text

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

默认的是 parquet,可以通过 spark.sql.sources.default,修改默认配置。

2. Parquet 文件

val parquetFileDF =spark.read.parquet("people.parquet")  peopleDF.write.parquet("people.parquet")

3. ORC 文件

val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json") ds.write.mode("append").orc("/opt/outputorc/") spark.read.orc("/opt/outputorc/*").show(1)

4. JSON

ds.write.mode("overwrite").json("/opt/outputjson/") spark.read.json("/opt/outputjson/*").show()

5. Hive 表

spark 1.6 及以前的版本使用 hive 表需要 hivecontext。Spark2 开始只需要创建 sparksession 增加 enableHiveSupport()即可。

val spark = SparkSession .builder() .config(sparkConf) .enableHiveSupport() .getOrCreate() spark.sql("select count(*) from student").show()

6. JDBC

写入 mysql

wcdf.repartition(1).write.mode("append").option("user", "root")  .option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())

从 mysql 里读

val fromMysql = spark.read.option("user", "root")  .option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())

7. 自定义数据源

自定义 source 比较简单,首先我们要看看 source 加载的方式。指定的目录下,定义一个 DefaultSource 类,在类里面实现自定义 source,就可以实现我们的目标。

import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport} class DefaultSource  extends DataSourceV2 with ReadSupport {  def createReader(options: DataSourceOptions) = new SimpleDataSourceReader() }

import org.apache.spark.sql.Row import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} import org.apache.spark.sql.types.{StringType, StructField, StructType} class SimpleDataSourceReader extends DataSourceReader {  def readSchema() = StructType(Array(StructField("value", StringType)))  def createDataReaderFactories = {    val factoryList = new java.util.ArrayList[DataReaderFactory[Row]]    factoryList.add(new SimpleDataSourceReaderFactory())    factoryList  } }

import org.apache.spark.sql.Row import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} class SimpleDataSourceReaderFactory extends  DataReaderFactory[Row] with DataReader[Row] {  def createDataReader = new SimpleDataSourceReaderFactory()  val values = Array("1", "2", "3", "4", "5")  var index = 0  def next = index < values.length  def get = {    val row = Row(values(index))    index = index + 1    row  }  def close() = Unit }

使用

val simpleDf = spark.read  .format("bigdata.spark.SparkSQL.DataSources")  .load() simpleDf.show()

 / 优化器及执行计划 / 

1. 流程简介

总体执行流程如下:从提供的输入 API(SQL,Dataset, dataframe)开始,依次经过 unresolved 逻辑计划,解析的逻辑计划,优化的逻辑计划,物理计划,然后根据 cost based 优化,选取一条物理计划进行执行。

简单化成四个部分:

/1 analysis

Spark 2.0 以后语法树生成使用的是 antlr4,之前是 scalaparse。

/2 logical optimization

常量合并,谓词下推,列裁剪,boolean 表达式简化,和其它的规则。

/3 physical planning

eg:SortExec 。         

/4 Codegen

codegen 技术是用 scala 的字符串插值特性生成源码,然后使用 Janino 编译成 java字节码,Eg: SortExec。

2. 自定义优化器

/1 实现

继承 Rule[LogicalPlan]

object MultiplyOptimizationRule extends Rule[LogicalPlan] {    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {      case Multiply(left,right) if right.isInstanceOf[Literal] &&        right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>        println("=========> optimization of one applied")        left    }  }      spark.experimental.extraOptimizations = Seq(MultiplyOptimizationRule)    val multipliedDFWithOptimization = df.selectExpr("amountPaid * 1")    println("after optimization")


/2 注册

spark.experimental.extraOptimizations= Seq(MultiplyOptimizationRule)

/3 使用

selectExpr("amountPaid* 1")

3. 自定义执行计划

/1 物理计划

继承 SparkLan 实现 doExecute 方法。

/2 逻辑计划

继承 SparkStrategy 实现 apply。

case class FastOperator(output: Seq[Attribute],child:SparkPlan) extends SparkPlan {  override def children: Seq[SparkPlan] = Nil  override protected def doExecute(): RDD[InternalRow] = {    val row = org.apache.spark.sql.Row("hi",12L)    val unsafeRow = toUnsafeRow(row, Array(org.apache.spark.sql.types.StringType,org.apache.spark.sql.types.LongType))    sparkContext.parallelize(Seq(unsafeRow),1)  }  def toUnsafeRow(row: org.apache.spark.sql.Row, schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.catalyst.expressions.UnsafeRow = {    val converter = unsafeRowConverter(schema)    converter(row)  }  def unsafeRowConverter(schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.Row => org.apache.spark.sql.catalyst.expressions.UnsafeRow = {    val converter = org.apache.spark.sql.catalyst.expressions.UnsafeProjection.create(schema)    (row: org.apache.spark.sql.Row) => {      converter(org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[org.apache.spark.sql.catalyst.InternalRow])    }  } } case object NeverPlanned extends LeafNode {  override def output: Seq[Attribute] = Nil } object TestStrategy extends Strategy {  def apply(plan: LogicalPlan): Seq[SparkPlan] =    plan match {      case Project(pblist, child) =>        println("mt fastOperator ------------>")        FastOperator(pblist.map(_.toAttribute),planLater(child)) :: Nil      case Union(children) =>        println("mt union ========>")        UnionExec(children.map(planLater)) :: Nil      case LocalRelation(output, data, _) =>        LocalTableScanExec(output, data):: Nil      case _ => Nil  } }

/3 注册到 Spark 执行策略

spark.experimental.extraStrategies =Seq(countStrategy)

/4 使用

spark.sql("select count(*) fromtest")

美图数据技术团队
美图数据技术团队

美图拥有海量的用户数据,如何将美图的数据最大化的利用在各个场景,更大范围的发挥美图在数据上的优势,并挖掘数据无限的价值是我们的使命。

入门SparkSpark SQL
4
相关数据
模式匹配技术

在计算机科学中,模式匹配就是检查特定序列的标记是否存在某种模式的组成部分。 与模式识别相比,匹配通常必须是精确的。 模式通常具有序列或树结构的形式。 模式匹配的使用包括输出令牌序列内的模式的位置(如果有的话),输出匹配模式的某个分量,以及用另一个令牌序列(即搜索和替换)替换匹配模式。

参数技术

在数学和统计学裡,参数(英语:parameter)是使用通用变量来建立函数和变量之间关系(当这种关系很难用方程来阐述时)的一个数量。

分桶技术

将一个特征(通常是连续特征)转换成多个二元特征(称为桶或箱),通常是根据值区间进行转换。例如,您可以将温度区间分割为离散分箱,而不是将温度表示成单个连续的浮点特征。假设温度数据可精确到小数点后一位,则可以将介于 0.0 到 15.0 度之间的所有温度都归入一个分箱,将介于 15.1 到 30.0 度之间的所有温度归入第二个分箱,并将介于 30.1 到 50.0 度之间的所有温度归入第三个分箱。

插值技术

数学的数值分析领域中,内插或称插值(英语:interpolation)是一种通过已知的、离散的数据点,在范围内推求新数据点的过程或方法。求解科学和工程的问题时,通常有许多数据点借由采样、实验等方法获得,这些数据可能代表了有限个数值函数,其中自变量的值。而根据这些数据,我们往往希望得到一个连续的函数(也就是曲线);或者更密集的离散方程与已知数据互相吻合,这个过程叫做拟合。

逻辑技术

人工智能领域用逻辑来理解智能推理问题;它可以提供用于分析编程语言的技术,也可用作分析、表征知识或编程的工具。目前人们常用的逻辑分支有命题逻辑(Propositional Logic )以及一阶逻辑(FOL)等谓词逻辑。

桶排序技术

桶排序或所谓的箱排序,是一个排序算法,工作的原理是将数组分到有限数量的桶里。每个桶再个别排序(有可能再使用别的排序算法或是以递归方式继续使用桶排序进行排序)。桶排序是鸽巢排序的一种归纳结果。当要被排序的数组内的数值是均匀分配的时候,桶排序使用线性时间。

优化器技术

优化器基类提供了计算梯度loss的方法,并可以将梯度应用于变量。优化器里包含了实现了经典的优化算法,如梯度下降和Adagrad。 优化器是提供了一个可以使用各种优化算法的接口,可以让用户直接调用一些经典的优化算法,如梯度下降法等等。优化器(optimizers)类的基类。这个类定义了在训练模型的时候添加一个操作的API。用户基本上不会直接使用这个类,但是你会用到他的子类比如GradientDescentOptimizer, AdagradOptimizer, MomentumOptimizer(tensorflow下的优化器包)等等这些算法。

暂无评论
暂无评论~