SparkSQL手册
第一节
# 读取文件
scala> val df = spark.read.json("E:/project/python/data/user.json")
scala> val df = spark.read.format("json").load("E:/project/python/data/user.json")
# 查询数据
scala> df.show
+---+------+---+
|age| name|sex|
+---+------+---+
| 23|Calong| 男|
| 21| Test| 男|
| 22| Cat| 女|
+---+------+---+
# 从DataFrame创建当前会话临时表
scala> df.createOrReplaceTempView("user")
# 从DataFrame创建全局临时表
scala> df.createOrReplaceGlobalTempView("user")
# 从临时表中查询数据
scala> df.createOrReplaceTempView("user")
+---+------+---+
|age| name|sex|
+---+------+---+
| 23|Calong| 男|
| 21| Test| 男|
| 22| Cat| 女|
+---+------+---+
# 查询Schema信息
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
|-- sex: string (nullable = true)
# 查询列信息
scala> df.select("age").show
+---+
|age|
+---+
| 23|
| 21|
| 22|
+---+
# 列修改查询
scala> df.select($"age" + 1).show
+---------+
|(age + 1)|
+---------+
| 24|
| 22|
| 23|
+---------+
# 过滤查询
scala> df.filter($"age" > 22).show
+---+------+---+
|age| name|sex|
+---+------+---+
| 23|Calong| 男|
+---+------+---+
# 分组查询
scala> df.groupBy($"sex").count.show
+---+-----+
|sex|count|
+---+-----+
| 男| 2|
| 女| 1|
+---+-----+
DataFrame没有数据类型之分
第二节
# RDD转换为DataFrame
scala> val rdd = sc.makeRDD(List((1, "One"), (2, "Two"), (3, "Three"), (4, "Four")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[47] at makeRDD at <console>:24
scala> val df = rdd.toDF("num", "eng")
df: org.apache.spark.sql.DataFrame = [num: int, eng: string]
scala> df.show
+---+-----+
|num| eng|
+---+-----+
| 1| One|
| 2| Two|
| 3|Three|
| 4| Four|
+---+-----+
# 从样例类声明DataSet
scala> case class Person(name: String, age: Int)
defined class Person
scala> val list = List(Person("Test", 18), Person("Calong", 20), Person("Mike", 21))
list: List[Person] = List(Person(Test,18), Person(Calong,20), Person(Mike,21))
scala> val ds = list.toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
scala> ds.show
+------+---+
| name|age|
+------+---+
| Test| 18|
|Calong| 20|
| Mike| 21|
+------+---+
# DataFrame转换为DataSet
scala> val df = spark.read.json("E:/project/python/data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
scala> case class User(name: String, age: BigInt, sex: String)
defined class User
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [age: bigint, name: string ... 1 more field]
scala> ds.show
+---+------+---+
|age| name|sex|
+---+------+---+
| 23|Calong| 男|
| 21| Test| 男|
| 22| Cat| 女|
+---+------+---+
# DataSet转换为DataFrame
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
scala> df.show
+---+------+---+
|age| name|sex|
+---+------+---+
| 23|Calong| 男|
| 21| Test| 男|
| 22| Cat| 女|
+---+------+---+
# RDD转换为DataSet
scala> val rdd = sc.makeRDD(List(Person("Test", 18), Person("Calong", 20), Person("Mike", 21)))
rdd: org.apache.spark.rdd.RDD[Person] = ParallelCollectionRDD[12] at makeRDD at <console>:26
scala> rdd.toDS
res3: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
scala> res3.show
+------+---+
| name|age|
+------+---+
| Test| 18|
|Calong| 20|
| Mike| 21|
+------+---+
# DataSet转换为RDD
scala> val rdd = res3.rdd
rdd: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[18] at rdd at <console>:25
第三节
# 读取Json数据
scala> val df = spark.read.load("D:/hadoop/spark/spark-3.0.1-bin-hadoop2.7/examples/src/main/resources/users.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
# 导出数据
df.write.json("E:/output")
# 保存模式(append/overwrite/ignore)
df.write.mode("append").json("E:/output")
# 读取Csv文件
scala> val df = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("E:/project/python/data/people.csv")
df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> val df = spark.read.option("sep", ";").option("inferSchema", "true").option("header", "true").csv("E:/project/pyt
hon/data/people.csv")
df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> df.show
+-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
+-----+---+---------+
本作品采用《CC 协议》,转载必须注明作者和本文链接