SparkSQL手册

第一节

# 读取文件
scala> val df = spark.read.json("E:/project/python/data/user.json")

scala> val df = spark.read.format("json").load("E:/project/python/data/user.json")
# 查询数据
scala> df.show

+---+------+---+
|age|  name|sex|
+---+------+---+
| 23|Calong||
| 21|  Test||
| 22|   Cat||
+---+------+---+
# 从DataFrame创建当前会话临时表
scala> df.createOrReplaceTempView("user")
# 从DataFrame创建全局临时表
scala> df.createOrReplaceGlobalTempView("user")
# 从临时表中查询数据
scala> df.createOrReplaceTempView("user")

+---+------+---+
|age|  name|sex|
+---+------+---+
| 23|Calong||
| 21|  Test||
| 22|   Cat||
+---+------+---+
# 查询Schema信息
scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 # 查询列信息
 scala> df.select("age").show
+---+
|age|
+---+
| 23|
| 21|
| 22|
+---+
# 列修改查询
scala> df.select($"age" + 1).show
+---------+
|(age + 1)|
+---------+
|       24|
|       22|
|       23|
+---------+
# 过滤查询
scala> df.filter($"age" > 22).show
+---+------+---+
|age|  name|sex|
+---+------+---+
| 23|Calong||
+---+------+---+
# 分组查询
scala> df.groupBy($"sex").count.show
+---+-----+
|sex|count|
+---+-----+
||    2|
||    1|
+---+-----+

DataFrame没有数据类型之分

第二节

# RDD转换为DataFrame
scala> val rdd = sc.makeRDD(List((1, "One"), (2, "Two"), (3, "Three"), (4, "Four")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[47] at makeRDD at <console>:24

scala> val df = rdd.toDF("num", "eng")
df: org.apache.spark.sql.DataFrame = [num: int, eng: string]

scala> df.show
+---+-----+
|num|  eng|
+---+-----+
|  1|  One|
|  2|  Two|
|  3|Three|
|  4| Four|
+---+-----+
# 从样例类声明DataSet
scala> case class Person(name: String, age: Int)
defined class Person

scala> val list = List(Person("Test", 18), Person("Calong", 20), Person("Mike", 21))
list: List[Person] = List(Person(Test,18), Person(Calong,20), Person(Mike,21))

scala> val ds = list.toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]

scala> ds.show
+------+---+
|  name|age|
+------+---+
|  Test| 18|
|Calong| 20|
|  Mike| 21|
+------+---+
# DataFrame转换为DataSet
scala> val df = spark.read.json("E:/project/python/data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> case class User(name: String, age: BigInt, sex: String)
defined class User

scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [age: bigint, name: string ... 1 more field]

scala> ds.show
+---+------+---+
|age|  name|sex|
+---+------+---+
| 23|Calong||
| 21|  Test||
| 22|   Cat||
+---+------+---+
# DataSet转换为DataFrame
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> df.show
+---+------+---+
|age|  name|sex|
+---+------+---+
| 23|Calong||
| 21|  Test||
| 22|   Cat||
+---+------+---+
# RDD转换为DataSet
scala> val rdd = sc.makeRDD(List(Person("Test", 18), Person("Calong", 20), Person("Mike", 21)))
rdd: org.apache.spark.rdd.RDD[Person] = ParallelCollectionRDD[12] at makeRDD at <console>:26

scala> rdd.toDS
res3: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]

scala> res3.show
+------+---+
|  name|age|
+------+---+
|  Test| 18|
|Calong| 20|
|  Mike| 21|
+------+---+
# DataSet转换为RDD
scala> val rdd = res3.rdd
rdd: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[18] at rdd at <console>:25

第三节

# 读取Json数据
scala> val df = spark.read.load("D:/hadoop/spark/spark-3.0.1-bin-hadoop2.7/examples/src/main/resources/users.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> df.show
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+
# 导出数据
df.write.json("E:/output")
# 保存模式(append/overwrite/ignore)
df.write.mode("append").json("E:/output")
# 读取Csv文件
scala> val df = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("E:/project/python/data/people.csv")
df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> val df = spark.read.option("sep", ";").option("inferSchema", "true").option("header", "true").csv("E:/project/pyt
hon/data/people.csv")
df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> df.show
+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!