架构智慧
架构智慧
大数据主流架构经验交流分享
  1. 首页
  2. 离线计算
  3. 正文

使用spark窗口函数解决复杂条件场景问题

2020年08月11日 903点热度 0人点赞 0条评论

在工作或者学习中难免会遇到各种复杂需求的项目,有的是跟时间相关,有的跟距离相关等。

场景1 交通轨迹问题

数据有terminal_id,city,up_time三列。目标是统计car终端每次经过一个city的时间段;不是经过每个city的总时间。每个时间段以当地的up_time为准。

数据实例

    car1, 北京, 03-15 11:49:16
    car1, 北京, 03-15 12:50:38
    car1, 天津, 03-15 14:10:35
    car1, 天津, 03-15 19:20:47
    car1, 河北, 03-16 13:20:28
    car1, 河北, 03-16 15:20:27

需要实现获取的结果为

car1, 北京, 1.0小时
car1, 天津, 5.17小时
car1, 河北, 2小时

实现代码

###pyspark实现代码

 select_df = df_all.sort('terminal_id', 'up_time')\
                       .dropna(subset=['city','up_time'])
    carWindow = Window.partitionBy("terminal_id").orderBy("up_time")
    # 获取前一个city,窗口函数lag()默认取前1条的记录
    # 然后判断当前city与前一条的不同(或者pre_city为空),标记为switch=1,否则switch=0。
    split_df = select_df.withColumn("pre_city", F.lag("city").over(carWindow))\
                        .withColumn("switch", F.when(F.col("city")!=F.col("pre_city"), 1)\
                                            .when(F.isnull(F.col("pre_city")), 1).otherwise(0))
   
    # 对switch在carWindow中累加(不是求和),得到idx列,表示每个car第几次切换city。
    # 之后,对[idx, terminal_id, city]分组求时间差,就是每个session的时长。
    sess_df = split_df.withColumn("idx", F.sum("switch").over(carWindow))\
                      .groupBy('idx', 'terminal_id', 'city')\
                      .agg(F.max("up_time").alias("max_time"), F.min("up_time").alias("min_time"))
    # 时间差转小时,保留3位小数
    timeDiff = (F.unix_timestamp('max_time') - F.unix_timestamp('min_time'))/3600
    stay_df = sess_df.withColumn('stay_hour', F.round(timeDiff.cast(DoubleType()), 3))\
                        .drop('max_time', 'min_time')

场景2 计算网站用访问用户的留存时间

例如有一个网站的用户登录数据如下

| user_name|login_date|
+----------+----------+
|zhangsan |2012-01-04|
|lisi |2012-01-04|
|lisi |2012-01-06|
| wangwu |2012-01-10|
|zhangsan |2012-01-11|

需求是数据中添加一列,说明他们何时成为网站上的活动用户。但是有一个时间范围要求:在一段时间内,用户被认为是活动的,在这段时间之后,如果他们再次登录,他们的生效日期将重置。假设这段时间是5天。那么,从上表生成的结果表如下所示:

| user_name|login_date|became_active|
+----------------+----------+-------------+
|zhangsan|2012-01-04| 2012-01-04|
|lisi|2012-01-04| 2012-01-04|
|lisi|2012-01-06| 2012-01-04|
| wangwu|2012-01-10| 2012-01-10|
|zhangsan|2012-01-11| 2012-01-11|

使用scala实现代码如下

package com.empgo

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
import org.apache.spark.sql.SparkSession

object Demo {
  def main(args: Array[String]): Unit = {


    val spark = SparkSession.builder.master("local[4]").getOrCreate
    import spark.implicits._


    val df = Seq(
      ("zhangsan", "2012-01-04",100), ("lisi", "2012-01-04",200),
      ("lisi", "2012-01-06",1300), ("wangwu", "2012-01-10",100),
      ("zhangsan", "2012-01-11",200), ("zhangsan", "2012-01-14",1400),
      ("zhangsan", "2012-08-11",100)
    ).toDF("user_name", "login_date","distance")


    val userWindow = Window.partitionBy("user_name").orderBy("login_date")
    val userSessionWindow = Window.partitionBy("user_name", "session")
    val newSession =  (coalesce(
      datediff($"login_date", lag($"login_date", 1).over(userWindow)),
      lit(0)
    ) > 5).cast("bigint")
    val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
    val result = sessionized
      .withColumn("became_active", min($"login_date").over(userSessionWindow))
      .drop("session")
    result.show()
  }
}

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: spark 窗口函数
最后更新:2020年08月12日

hivefans

保持饥渴的专注,追求最佳的品质

点赞
< 上一篇
下一篇 >

文章评论

取消回复
分类目录
  • 大数据浪潮 (2)
  • 实时数仓 (1)
  • 实时计算 (6)
  • 离线计算 (4)
2020年8月
一 二 三 四 五 六 日
 12
3456789
10111213141516
17181920212223
24252627282930
31  
« 7月   9月 »
文章归档
  • 2021年7月 (2)
  • 2020年9月 (4)
  • 2020年8月 (4)
  • 2020年7月 (2)
  • 2020年6月 (1)
标签聚合
kafka flink elasticsearch 实时计算 spark 窗口函数 dataframe hive
友情链接
  • 大数据导航
  • 网站地图
  • 隐私政策

COPYRIGHT © 2020 架构智慧. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS

京ICP备19056408号