当前位置 博文首页 > Shockang的博客:从 Spark 源码角度解读 Dataset
本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系
假设 RDD 中的两行数据长这样
那么 DataFrame 中的数据长这样
Dataset 中的数据长这样
或者长这样(每行数据是个Object)
Dataset包含了DataFrame的功能,Spark2.0中两者统一,DataFrame表示为Dataset[Row],即Dataset的子集。
(1)Dataset可以在编译时检查类型
(2)并且是面向对象的编程接口
关于 DataFrame 请参考我的博客——DataFrame 是什么?
关于 RDD 和 DataFrame 的区别请参考我的博客——RDD 和 DataFrame 的区别是什么?
/**
* A Dataset is a strongly typed collection of domain-specific objects that can be transformed
* in parallel using functional or relational operations. Each Dataset also has an untyped view
* called a `DataFrame`, which is a Dataset of [[Row]].
*
* Operations available on Datasets are divided into transformations and actions. Transformations
* are the ones that produce new Datasets, and actions are the ones that trigger computation and
* return results. Example transformations include map, filter, select, and aggregate (`groupBy`).
* Example actions count, show, or writing data out to file systems.
*
* Datasets are "lazy", i.e. computations are only triggered when an action is invoked. Internally,
* a Dataset represents a logical plan that describes the computation required to produce the data.
* When an action is invoked, Spark's query optimizer optimizes the logical plan and generates a
* physical plan for efficient execution in a parallel and distributed manner. To explore the
* logical plan as well as optimized physical plan, use the `explain` function.
*
* To efficiently support domain-specific objects, an [[Encoder]] is required. The encoder maps
* the domain specific type `T` to Spark's internal type system. For example, given a class `Person`
* with two fields, `name` (string) and `age` (int), an encoder is used to tell Spark to generate
* code at runtime to serialize the `Person` object into a binary structure. This binary structure
* often has much lower memory footprint as well as are optimized for efficiency in data processing
* (e.g. in a columnar format). To understand the internal binary representation for data, use the
* `schema` function.
*
* There are typically two ways to create a Dataset. The most common way is by pointing Spark
* to some files on storage systems, using the `read` function available on a `SparkSession`.
* {{{
* val people = spark.read.parquet("...").as[Person] // Scala
* Dataset<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class)); // Java
* }}}
*
* Datasets can also be created through transformations available on existing Datasets. For example,
* the following creates a new Dataset by applying a filter on the existing one:
* {{{
* val names = people.map(_.name) // in Scala; names is a Dataset[String]
* Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING));
* }}}
*
* Dataset operations can also be untyped, through various domain-specific-language (DSL)
* functions defined in: Dataset (this class), [[Column]], and [[functions]]. These operations
* are very similar to the operations available in the data frame abstraction in R or Python.
*
* To select a column from the Dataset, use `apply` method in Scala and `col` in Java.
* {{{
* val ageCol = people("age") // in Scala
* Column ageCol = people.col("age"); // in Java
* }}}
*
* Note that the [[Column]] type can also be manipulated through its various functions.
* {{{
* // The following creates a new column that increases everybody's age by 10.
* people("age") + 10 // in Scala
* people.col("age").plus(10); // in Java
* }}}
*
* A more concrete example in Scala:
* {{{
* // To create Dataset[Row] using SparkSession
* val people = spark.read.parquet("...")
* val department = spark.read.parquet("...")
*
* people.filter("age > 30")
* .join(department, people("deptId") === department("id"))
* .groupBy(department("name"), people("gender"))
* .agg(avg(people("salary")), max(people("age")))
* }}}
*
* and in Java:
* {{{
* // To create Dataset<Row> using SparkSession
* Dataset<Row> people = spark.read().parquet("...");
* Dataset<Row> department = spark.read().parquet("...");
*
* people.filter(people.col("age").gt(30))
* .join(department, people.col("deptId").equalTo(department.col("id")))
* .groupBy(department.col("name"), people.col("gender"))
* .agg(avg(people.col("salary")), max(people.col("age")));
* }}}
*
* @groupname basic Basic Dataset functions
* @groupname action Actions
* @groupname untypedrel Untyped transformations
* @groupname typedrel Typed transformations
*
* @since 1.6.0
*/
@Stable
class Dataset[T] private[sql](
@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
@DeveloperApi @Unstable @transient val encoder: Encoder[T])
extends Serializable
Dataset 是特定域的对象的强类型集合,可以使用函数或关系操作进行并行转换。
每个 Dataset 还有一个称为 DataFrame 的非类型化视图,它是 Row 的数据集。
Dataset 上可用的操作分为转换和动作。
转换是产生新 Dataset 的转换,动作是触发计算并返回结果的转换。
举例说明, 转换包括map、filter、select和aggregate(groupBy)。
动作包括count、show 或将数据写入文件系统。
Dataset 是“惰性”的,即只有在调用某个操作时才会触发计算。
在内部,Dataset 表示描述生成数据所需计算的逻辑计划。
当一个动作被调用时,Spark的查询优化器会优化逻辑计划并生成一个物理计划,以便以并行和分布式的方式高效地执行。
要探索逻辑计划以及优化的物理计划,请使用explain函数。
为了有效地支持特定领域的对象,需要一个编码器。
编码器将特定于域的类型T映射到Spark的内部类型系统。
例如,给定一个类Person,它有两个字段name(string)和age(int),编码器用来告诉Spark在运行时生成代码,将Person对象序列化为二进制结构。
这种二进制结构通常具有较低的内存占用,并针对数据处理的效率进行了优化(例如,以 Column 格式)。
要理解数据的内部二进制表示,请使用schema函数。
创建 Dataset 通常有两种方法。
最常见的方法是使用SparkSession上的read函数,将Spark指向存储系统上的某些文件。
val people = spark.read.parquet("...").as[Person] // Scala
Dataset<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class)); // Java
还可以通过现有 Dataset 上可用的转换来创建 Dataset。
例如,以下内容通过对现有数据集应用筛选器来创建新数据集:
val names = people.map(_.name) // in Scala; names is a Dataset[String]
Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING));
Dataset 操作也可以通过在中定义的各种域特定语言(DSL)函数来取消类型化:
Dataset (当前类), [[Column]], and [[functions]]。
这些操作与R或Python中的数据帧抽象中可用的操作非常相似。
要从 Dataset 中选择 column ,请使用Scala中的apply方法和Java中的col。
val ageCol = people("age") // in Scala
Column ageCol = people.col("age"); // in Java
请注意,[[Column]] 类型也可以通过其各种函数进行操作。
//下面创建了一个新的 column ,每个人的年龄都增加了10岁。
people("age") + 10 // in Scala
people.col("age").plus(10); // in Java
更具体的例子:
// 使用SparkSession创建 Dataset[Row]
val people = spark.read.parquet("...")
val department = spark.read.parquet("...")
people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), people("gender"))
.agg(avg(people("salary")), max(people("age")))
// 使用SparkSession创建 Dataset<Row>
Dataset<Row> people = spark.read().parquet("...");
Dataset<Row> department = spark.read().parquet("...");
people.filter(people.col("age").gt(30))
.join(department, people.col("deptId").equalTo(department.col("id")))
.groupBy(department.col("name"), people.col("gender"))
.agg(avg(people.col("salary")), max(people.col("age")));
cs