在spark中, 对于一个数据集,map
是对每行进行操作,得到一个结果,一对一映射;reduce
则是对多行进行操作,得到一个结果,多对一汇总;而 window
函数则是对多行进行操作,得到多个结果,多行对应多行。 此篇博客会以实例介绍 window
函数的基本概念和用法。

windows窗口函数包含3种:
- ranking 排名类
- analytic 分析类
- aggregate 聚合类
Function Type | SQL | DataFrame API |
---|---|---|
排名类 | rank | rank |
排名类 | dense_rank | denseRank |
排名类 | percent_rank | percentRank |
排名类 | ntile | ntile |
排名类 | row_number | rowNumber |
分析类 | cume_dist | cumeDist |
分析类 | first_value | firstValue |
分析类 | last_value | lastValue |
分析类 | lag | lag |
分析类 | lead | lead |
聚合类 | sum | sum |
聚合类 | avg | avg |
聚合类 | max | max |
聚合类 | min | min |
聚合类函数这里就不再讲述了,主要总结排名类和分析类函数
rank 为相同组的数据计算排名,如果相同组中排序字段相同,当前行的排名值和前一行相同;如果相同组中排序字段不同,则当前行的排名值为该行在当前组中的行号;因此排名序列会出现间隙
用法:
import org.apache.spark.sql.expressions.Window
val byDepnameSalaryDesc = Window.partitionBy('depname).orderBy('salary desc)
val rankByDepname = rank().over(byDepnameSalaryDesc)
empsalary.select('*, rankByDepname as 'rank).show
dense_rank 为相同组内数据计算排名,如果相同组中排序字段相同,当前行的排名值和前一行相同;如果相同组中排序字段不同,则当前行的排名值为前一行排名值加1;排名序列不会出现间隙
用法
import org.apache.spark.sql.expressions.Window
val overCategory = Window.partitionBy('category).orderBy('revenue.desc)
val ranked = data.withColumn("rank", dense_rank.over(overCategory))
ranked.show
percent_rank 该值的计算公式(组内排名-1)/(组内行数-1)
,如果组内只有1行,则结果为0
用法
val dataset = spark.range(9).withColumn("bucket", 'id % 3)
import org.apache.spark.sql.expressions.Window
val byBucket = Window.partitionBy('bucket).orderBy('id)
dataset.withColumn("percent_rank", percent_rank over byBucket).show
ntile 将组内数据排序然后按照指定的n切分成n个桶,该值为当前行的桶号(桶号从1开始)
用法
val dataset = spark.range(7).select('*, 'id % 3 as "bucket")
import org.apache.spark.sql.expressions.Window
val byBuckets = Window.partitionBy('bucket).orderBy('id)
dataset.select('*, ntile(3) over byBuckets as "ntile").show
row_number 将组内数据排序后,该值为当前行在当前组内的从1开始的递增的唯一序号值
用法
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
dataset.withColumn("row_number", row_number() over windowSpec).show
cume_dist 该值的计算公式为:组内小于等于当前行值的行数/组内总行数
用法
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
dataset.withColumn("cume_dist", cume_dist over windowSpec).show
lag 计算组内当前行按照排序字段排序的之前offset行的input列的值,如果offset大于当前窗口(组内当前行之前行数)则返回default值,default值默认为null
用法
lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column
lag(e: Column, offset: Int, defaultValue: Any): Column
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
dataset.withColumn("lag", lag('id, 1) over windowSpec).show
dataset.withColumn("lag", lag('id, 2, "<default_value>") over windowSpec).show
lead 计算组内当前行按照排序字段排序的之后offset行的input列的值,如果offset大于当前窗口(组内当前行之后行数)则返回default值,default值默认为null
用法
lead(columnName: String, offset: Int): Column
lead(e: Column, offset: Int): Column
lead(columnName: String, offset: Int, defaultValue: Any): Column
lead(e: Column, offset: Int, defaultValue: Any): Column
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
dataset.withColumn("lead", lead('id, 1) over windowSpec).show
dataset.withColumn("lead", lead('id, 2, "<default_value>") over windowSpec).show
文章评论