在工作或者学习中难免会遇到各种复杂需求的项目,有的是跟时间相关,有的跟距离相关等。

场景1 交通轨迹问题
数据有terminal_id,city,up_time三列。目标是统计car终端每次经过一个city的时间段;不是经过每个city的总时间。每个时间段以当地的up_time为准。
数据实例
car1, 北京, 03-15 11:49:16
car1, 北京, 03-15 12:50:38
car1, 天津, 03-15 14:10:35
car1, 天津, 03-15 19:20:47
car1, 河北, 03-16 13:20:28
car1, 河北, 03-16 15:20:27
需要实现获取的结果为
car1, 北京, 1.0小时
car1, 天津, 5.17小时
car1, 河北, 2小时
实现代码
###pyspark实现代码
select_df = df_all.sort('terminal_id', 'up_time')\
.dropna(subset=['city','up_time'])
carWindow = Window.partitionBy("terminal_id").orderBy("up_time")
# 获取前一个city,窗口函数lag()默认取前1条的记录
# 然后判断当前city与前一条的不同(或者pre_city为空),标记为switch=1,否则switch=0。
split_df = select_df.withColumn("pre_city", F.lag("city").over(carWindow))\
.withColumn("switch", F.when(F.col("city")!=F.col("pre_city"), 1)\
.when(F.isnull(F.col("pre_city")), 1).otherwise(0))
# 对switch在carWindow中累加(不是求和),得到idx列,表示每个car第几次切换city。
# 之后,对[idx, terminal_id, city]分组求时间差,就是每个session的时长。
sess_df = split_df.withColumn("idx", F.sum("switch").over(carWindow))\
.groupBy('idx', 'terminal_id', 'city')\
.agg(F.max("up_time").alias("max_time"), F.min("up_time").alias("min_time"))
# 时间差转小时,保留3位小数
timeDiff = (F.unix_timestamp('max_time') - F.unix_timestamp('min_time'))/3600
stay_df = sess_df.withColumn('stay_hour', F.round(timeDiff.cast(DoubleType()), 3))\
.drop('max_time', 'min_time')
场景2 计算网站用访问用户的留存时间
例如有一个网站的用户登录数据如下
| user_name|login_date|
+----------+----------+
|zhangsan |2012-01-04|
|lisi |2012-01-04|
|lisi |2012-01-06|
| wangwu |2012-01-10|
|zhangsan |2012-01-11|
需求是数据中添加一列,说明他们何时成为网站上的活动用户。但是有一个时间范围要求:在一段时间内,用户被认为是活动的,在这段时间之后,如果他们再次登录,他们的生效日期将重置。假设这段时间是5天。那么,从上表生成的结果表如下所示:
| user_name|login_date|became_active|
+----------------+----------+-------------+
|zhangsan|2012-01-04| 2012-01-04|
|lisi|2012-01-04| 2012-01-04|
|lisi|2012-01-06| 2012-01-04|
| wangwu|2012-01-10| 2012-01-10|
|zhangsan|2012-01-11| 2012-01-11|
使用scala实现代码如下
package com.empgo
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
import org.apache.spark.sql.SparkSession
object Demo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.master("local[4]").getOrCreate
import spark.implicits._
val df = Seq(
("zhangsan", "2012-01-04",100), ("lisi", "2012-01-04",200),
("lisi", "2012-01-06",1300), ("wangwu", "2012-01-10",100),
("zhangsan", "2012-01-11",200), ("zhangsan", "2012-01-14",1400),
("zhangsan", "2012-08-11",100)
).toDF("user_name", "login_date","distance")
val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")
val newSession = (coalesce(
datediff($"login_date", lag($"login_date", 1).over(userWindow)),
lit(0)
) > 5).cast("bigint")
val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
val result = sessionized
.withColumn("became_active", min($"login_date").over(userSessionWindow))
.drop("session")
result.show()
}
}
文章评论