架构智慧
架构智慧
大数据主流架构经验交流分享
  1. 首页
  2. 实时计算
  3. 正文

spark sql中的window函数总结

2020年09月27日 1054点热度 0人点赞 0条评论

在spark中, 对于一个数据集,map 是对每行进行操作,得到一个结果,一对一映射;reduce 则是对多行进行操作,得到一个结果,多对一汇总;而 window 函数则是对多行进行操作,得到多个结果,多行对应多行。 此篇博客会以实例介绍 window 函数的基本概念和用法。

windows窗口函数包含3种:

  1. ranking 排名类
  2. analytic 分析类
  3. aggregate 聚合类
Function TypeSQLDataFrame API
排名类 rankrank
排名类 dense_rankdenseRank
排名类 percent_rankpercentRank
排名类 ntilentile
排名类 row_numberrowNumber
分析类 cume_distcumeDist
分析类 first_valuefirstValue
分析类 last_valuelastValue
分析类 laglag
分析类 leadlead
聚合类 sum sum
聚合类 avg avg
聚合类 max max
聚合类 min min

聚合类函数这里就不再讲述了,主要总结排名类和分析类函数

rank 为相同组的数据计算排名,如果相同组中排序字段相同,当前行的排名值和前一行相同;如果相同组中排序字段不同,则当前行的排名值为该行在当前组中的行号;因此排名序列会出现间隙

用法:

import org.apache.spark.sql.expressions.Window
val byDepnameSalaryDesc = Window.partitionBy('depname).orderBy('salary desc)
val rankByDepname = rank().over(byDepnameSalaryDesc)
empsalary.select('*, rankByDepname as 'rank).show

dense_rank 为相同组内数据计算排名,如果相同组中排序字段相同,当前行的排名值和前一行相同;如果相同组中排序字段不同,则当前行的排名值为前一行排名值加1;排名序列不会出现间隙

用法

import org.apache.spark.sql.expressions.Window
val overCategory = Window.partitionBy('category).orderBy('revenue.desc)
val ranked = data.withColumn("rank", dense_rank.over(overCategory))
ranked.show

percent_rank 该值的计算公式(组内排名-1)/(组内行数-1),如果组内只有1行,则结果为0

用法

val dataset = spark.range(9).withColumn("bucket", 'id % 3)
import org.apache.spark.sql.expressions.Window
val byBucket = Window.partitionBy('bucket).orderBy('id)
dataset.withColumn("percent_rank", percent_rank over byBucket).show

ntile 将组内数据排序然后按照指定的n切分成n个桶,该值为当前行的桶号(桶号从1开始)

用法

val dataset = spark.range(7).select('*, 'id % 3 as "bucket")
import org.apache.spark.sql.expressions.Window
val byBuckets = Window.partitionBy('bucket).orderBy('id)
dataset.select('*, ntile(3) over byBuckets as "ntile").show

row_number 将组内数据排序后,该值为当前行在当前组内的从1开始的递增的唯一序号值

用法

val buckets = spark.range(9).withColumn("bucket", 'id % 3)
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
dataset.withColumn("row_number", row_number() over windowSpec).show

cume_dist 该值的计算公式为:组内小于等于当前行值的行数/组内总行数

用法

val buckets = spark.range(9).withColumn("bucket", 'id % 3)
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
dataset.withColumn("cume_dist", cume_dist over windowSpec).show

lag 计算组内当前行按照排序字段排序的之前offset行的input列的值,如果offset大于当前窗口(组内当前行之前行数)则返回default值,default值默认为null

用法

lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column
lag(e: Column, offset: Int, defaultValue: Any): Column
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
dataset.withColumn("lag", lag('id, 1) over windowSpec).show
dataset.withColumn("lag", lag('id, 2, "<default_value>") over windowSpec).show

lead 计算组内当前行按照排序字段排序的之后offset行的input列的值,如果offset大于当前窗口(组内当前行之后行数)则返回default值,default值默认为null

用法

lead(columnName: String, offset: Int): Column
lead(e: Column, offset: Int): Column
lead(columnName: String, offset: Int, defaultValue: Any): Column
lead(e: Column, offset: Int, defaultValue: Any): Column
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
dataset.withColumn("lead", lead('id, 1) over windowSpec).show
dataset.withColumn("lead", lead('id, 2, "<default_value>") over windowSpec).show
本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: spark spark sql window函数
最后更新:2020年09月27日

hivefans

保持饥渴的专注,追求最佳的品质

点赞
< 上一篇
下一篇 >

文章评论

取消回复
分类目录
  • 大数据浪潮 (2)
  • 实时数仓 (1)
  • 实时计算 (6)
  • 离线计算 (4)
2020年9月
一 二 三 四 五 六 日
 123456
78910111213
14151617181920
21222324252627
282930  
« 8月   7月 »
文章归档
  • 2021年7月 (2)
  • 2020年9月 (4)
  • 2020年8月 (4)
  • 2020年7月 (2)
  • 2020年6月 (1)
标签聚合
实时计算 窗口函数 dataframe elasticsearch hive flink kafka spark
友情链接
  • 大数据导航
  • 网站地图
  • 隐私政策

COPYRIGHT © 2020 架构智慧. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS

京ICP备19056408号