Spark SQL

Mr.zh Lv3

Spark SQL

1. dataframe类型的详解

2. dataframe的创建

3. dataframe的使用

3.1 使用df中的方法进行操作(dsl方法 — df提供)

1
2
3
4
5
6
7
8
9
10
df数据的查询
1. 指定查询的字段数据(指定字段的字符串格式,结果会返回一个新的df,可以使用df.show()查看
df.select('name')
2. 指定多个字段--一次查询多个字段(将字段放入列表中,注意需要使用字符串的形式)
df.select(['name','age'])
3. 直接使用df进行查询
df.select(df['name'],df['age'])
4. 展示所有数据---> 直接使用df.show() 即可将所有结果查询出来
df.show() # 此时就是对所有字段进行处理
5. df.show(num) 可以指定展示多少条数据,默认是20条数据,同时show()方法不会返回新的df
1
2
3
4
5
6
7
df的条件过滤
1. df.where('age > 20') # 相当于将所有数据都进行过滤,返回一个新的df
2. df.where('age > 20') # 相当于将所有数据都进行过滤,返回一个新的df,并且,没有指定行
相当于得到年龄大于20的全部过滤出来
3. 多个条件的与或非(and , or , )
df.where('age >= 20 and gender = "男"') # 注意单双引号的嵌套使用

1
2
3
4
5
6
7
8
group by(分组操作,一般结合聚合操作)的操作
1. df.groupby('gender').sum('age')
# 相当于对性别进行分组操作,同时对两个组中的年龄进行累加
# select gender , sum('age') from df group by gender
# 常见的聚合函数:sum(),avg() , min() , max()
2. 对于多个分组字段的使用,和进行查询的时候一样,使用一个列表进行
# 相当于先对性别进行一个分组,然后在两个性别中在对每一个科目在进行一个分组
df.groupby(['gender , cls']).avg('age')
1
2
3
分组后的数据过滤
1. # 注意的点就是分组后的过滤,也不需要使用having,而是同样使用where
df.groupby('gender').sum('age').where('sum(age) > 80')
1
2
3
4
5
6
7
8
9
10
11
12
13
排序操作 orderBy()
1. df.orderBy('age')
# 默认是升序排序,按照年龄进行升序排序
# 返回一个新的df
# 如果需要降序,就传入一个参数,进行降序排序
df.orderBy('age',ascending = False)
2. 多字段排序
# 如果需要多个字段进行排序,可以使用列表
# 先按照age进行排序,如果年龄相同,就按照id进行从小到大进行排序
df.orderBy(['age','id'])
3. (多字段是按照一个排序规则进行操作)
注意点就是,不能指定一个字段进行升序,一个降序。也就是说,和sql里面是一样的。

1
2
3
4
5
指定返回数量 limit()
1. # 返回指定数量的数据
# 返回一个新的df
df.limit(5)

3.2 使用sql语句进行操作(sql语句—sparkSession提供)

3.3 关联的操作

1
2
3
4
join的关联操作
1. 内关联
2. 左关联 : 左边的数据全部展示,如果有相同的id,右边表的数据也会展示
3. 右关联

3.4 df数据的缓存和checkpoint

1
2
3
4
5
6
# 缓存使用方法:
# 进行缓存
df.persist()
# 后序在进行计算时,如果计算错误就直接从缓存中读取
# 缓存的级别:默认有限缓存到内存中,内存不足缓存到磁盘上
new_df = df.where('id > 1')
1
2
3
4
5
6
7
8
# checkpoint的作用:
# checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面 计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)

# 使用方法 首先设置checkpoint存储的位置
sc.setCheckpointDir('hdfs:///spark_checkpoint')

# 然后进行checkpoint操作:如果计算错误,就可以从hdfs中进行读取
df.checkpoint()
  • 当存在缓存和checkpoint时候,优先读取缓存中的数据,因为缓存的读写速度较快

3.5 df中的内置函数

  1. spark中的内置函数和hive中的内置函数基本一致

  2. 使用之前需要导入相应的模块

    1
    2
    from pyspark.sql import SparkSession,functions as F
    # functions是sparksql中的内置函数模块,里面封装了许多内置方法
1
2
3
4
5
6
7
8
9
10
# 1.字符串的操作  --  拼接  -- 得到一个新的df
new_df = df.select(F.concat('id','name')) # sql:select concat(id , name) from df
new_df.show() # 展示结果,这个方法拼接结果不会有分割字符
# 2.字符串的拼接,并且指定分割字符的方法
new_df = df.select(F.concat_ws(',' , 'id' , 'name'))
new_df.show()
# 3. 字符串的截取
df.select(F.substring('name' , 1 , 4)) # 表示从第一个字符开始,截取4个长度的字符
# 4. 字符串的切割
df.select(F.split('date' , '-')) # 将年份根据-进行切割,得到一个列表
1
2
3
4
5
6
7
8
9
10
11
# 1. 时间操作,获取当前日期
df.select(F.current_date())
# 2. 获取当前日期时间
df.select(F.current_timestamp())
# 3. 获取当前的unix时间(时间戳)
df.select(F.unix_timestamp())
# 4.将unix时间转化为指定格式的时间
df.select(F.from_unixtime('unix_t' , format="yyyy-MM-dd HH:mm:ss"))
# 5. 时间加减操作
df.select(F.date_add('date' , 1)) # 将字段date加一天
df.select(F.date_add('date' , -1)) # 将字段date减一天
1
2
3
4
5
# 内置函数(常用于多个聚合操作) ---  需要配合agg进行使用(agg里面可以使用多个内置函数)
df.groupby('gender').agg(F.sum('age') , F.avg('age'))

# 指定某个字段保留两位小数
df.groupby('gender').agg(F.sum() , F.round(F.avg('age') , 2))
1
2
3
# 内置函数取别名的方法 ---  一般不使用中文的别名哦
# 取别名的操作,并且跟在内置函数后面
df.groupby('gender').agg(F.sum().alias('总和') , F.round(F.avg('age') , 2).alias('平均值'))

3.6 SparkSession的说明

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark.sql import SparkSession
# 生成sparksession对象
# 默认是采用本地模式进行计算
ss = SparkSession.builder.getOrCreate()
# 也可以指定其他资源调度方式进行计算 ----->master()
# master('yarn') 采用yarn
# master('spark://node1:7077') 采用standalone
ss1 = SparkSession.builder.master('yarn').getOrCreate()
# 也可以指定计算程序的任务名称 ----->appName()
ss1 = SparkSession.builder.master('yarn').appName('yarn_sparkSql').getOrCreate()
# 指定配置信息 ------->config()
ss1 = SparkSession.builder.master('yarn').config().getOrCreate()

4.小案例—电影数据统计分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 原数据格式,分析,以及导入原数据到hdfs中
# 字段: 用户id\t电影id\t评分\t时间
代码流程如下:
# 导入模块
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# 生成SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取文件数据
sc = ss.sparkContext
# 读取文件生成rdd数据
rdd = sc.textFile('hdfs:///movie')

# 这一步可以先进行查看一部分的数据
print(rdd.take(20)) # 查看20条数据

# 将rdd数据转化为df数据,先将rdd转化为二位嵌套
table_rdd = rdd.map(
lambda x : [int(x.split('\t')[0]) , int(x.split('\t')[1]) , double(x.split('\t')[2]) ,s.split('\t')[3] ] # 注意时间可以使用字符串
)
# 定义schedule信息,指定字段名和字段类型
schema_type = StructType()
.add('userId',IntegerType())
.add('movieId',IntegerType())
.add('score',DoubleType())
.add('unix_time',StringType())
# 转化为df数据
df = table_rdd.toDF(schema_type)
# 查看df数据,默认20条
df.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 1.查询每个用户的平均分
user_avg = df.groupBy('userId').agg(F.avg('score').alias('avg_data'))
user_avg.show() # 用户打分平均分展示
# 2. 查询每个电影的平均分
movie_avg = df.groupBy('movieId').agg(F.avg('score').alias('avg_data'))
# 计算结果保留两位小数
movie_avg= df.groupBy('movieId').agg(F.round(F.avg('score'),2).alias('avg_data'))
movie_avg.show() # 电影平均分展示
# 3. 查询高分电影中(评分大于3的)打分次数最多的用户,并求出此人打的平均分
3.1 : 得到不同用户打高分电影的数量
df.where('score >3').groupBy('userId')
.agg(F.count('movieId').alias('count_data'))
3.2 :然后根据数量进行降序排序,得到第一个就是打分次数最多的用户
df.where('score >3').groupBy('userId')
.agg(F.count('movieId').alias('count_data'))
.orderBy('count_data',ascending = False)
3.3 :first()方法,取出第一行数据
first()取出的是一个row对象,不在是一个df对象,不能使用show()进行展示哦
user_rdd = df.where('score >3').groupBy('userId')
.agg(F.count('movieId').alias('count_data'))
.orderBy('count_data',ascending = False).first()
print(user_rdd) # 直接进行打印得到一个rdd对象,我们需要的是userId
userId = user_rdd['userId'] # 得到目标用户
print(user_rdd['userId'])
3.4 : 根据用户id查找这个用户所有打分电影的平均分:
# 方法1
user_movie_avg = df.groupBy('userId').agg(F.avg('score').alias('avg_data'))
# 方法2
user_movie_avg = movie_avg.where(movie_avg['userId'] == userId)
# 结果展示
user_movie_avg.show()
1
2
3
# 4. 查询每个用户的平均打分,最低打分,最高打分
# agg()里面可以使用多个内置函数
df.groupBy('userId').agg(F.avg('score') , F.max('score') , F.min('score'))
1
2
3
4
5
# 5. 查询被评分超过100次的电影的平均分,进行排名,取出Top10
df.groupBy('movieId')
.agg(F.count('movieId').alias('count_data') , F.avg('score').alias('avg_data'))
.where('count_data > 100')
.orderBy('avg_data',ascending = False).limit(10)

5. 分区数目(了解)

1
2
3
4
# 调整分区数目
ss = SparkSession.builder.master('yarn')
.config('spark.sql.shuffle.partitions','6')
.getOrCreate()

6. sparkSession读取不同类型文件

6.1 数据读入

注意在读取mysql的数据时候,需要将驱动依赖放到spark/jars/下

6.2 数据读出

  • 将df数据写入到不同的文件下

7. 自定义函数

7.1 函数分类

  • udf 一进一出 可以自定义
  • udaf 多进一出 可以自定义 需要借助pandas
  • udtf 一进多出 不能自定义
  1. 自定义udf函数步骤
    • 数据是一行一行处理(传递一行处理一行)
    • 自定义udf函数也可以使用sql语句的方式进行使用哦

  • 自定义函数也可以使用装饰器的方式进行注册

  • 但这种方式不能使用SQL语句的方式进行使用
  1. 自定义udaf函数步骤

    • 自定义udaf函数的格式

    • 注册方法同上

8.pandas的学习

  1. 定义方式(两种):

    • 值得注意的是pandas中的df和spark中的df不相同哦,pandas中的是单机计算资源,速度较慢

  2. 将pandas中的df转化为spark中的df进行计算

  • Title: Spark SQL
  • Author: Mr.zh
  • Created at : 2024-04-15 17:06:08
  • Updated at : 2024-04-15 17:07:55
  • Link: https://redefine.ohevan.com/2024/04/15/Spark-SQL/
  • License: This work is licensed under CC BY-NC-SA 4.0.