在大部分的项目中,使用spark streaming读取kafka的架构中,kafka的数据都是json格式发送的,比较复杂一些的是json中嵌套数组json格式,下面总结了两种解析方法
json数据格式
{"terminalId":4501109,"gps":[{"move":2,"distance":23.3,"gpsId":0,"direct":237.22,"lon":112.581512,"terminalId":4501109,"speed":83.12,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854135000,"lat":23.031654,"height":11.52},{"move":2,"distance":22.65,"gpsId":0,"direct":236.65,"lon":112.581139,"terminalId":4501109,"speed":82.0,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854137000,"lat":23.031427,"height":12.99},{"move":2,"distance":22.77,"gpsId":0,"direct":236.06,"lon":112.580765,"terminalId":4501109,"speed":82.98,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854139000,"lat":23.031197,"height":12.47},{"move":2,"distance":21.41,"gpsId":0,"direct":236.95,"lon":112.580406,"terminalId":4501109,"speed":79.32,"acceleration":0.0,"satelliteNum":24,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854141000,"lat":23.030968,"height":12.03}]}
第一种方法 使用json4s的默认转化器解析
实现代码如下
package com.empgo
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object Demo {
var zookeeperservers = "emg102:2181,emg103:2181,emg104:2181"
case class gpslog(distance:Double,gpsId:Int,direct:Double,lon:Double,terminalId:Long,
speed:Double,acceleration:Double,satelliteNum:Int,
sysTime:Long,time:Long,lat:Double,height:Double)
case class log(terminalId:Long, gps: List[gpslog])
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("ecargps")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "emg104:9092,emg105:9092,emg106:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "for_gps_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("ecar-photo-gps")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val mystream = stream.map(record => record.value)
.map(value => {
// 隐式转换,使用json4s的默认转化器
implicit val formats: DefaultFormats.type = DefaultFormats
val json = parse(value)
// 样式类从JSON对象中提取值
json.extract[log]
}).flatMap(x=>x.gps)
.window(Seconds(5), Seconds(5))
.foreachRDD(rdd=>{
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
val df = spark.createDataFrame(rdd)
df.show(10,false)
})
)
ssc.start()
ssc.awaitTermination()
}
}
第二种 使用spark.read.json解析数据
核心代码
stream.map(record=>record.value())
.window(Seconds(5), Seconds(5))
.foreachRDD(rdd=>{
if(!rdd.isEmpty()) {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = spark.read.json(rdd)
val explodedf = df.select(explode($"gps")).toDF("gps")
val gps_df = explodedf.select("gps.terminalid", "gps.time", "gps.lat", "gps.lon")
gps_df.show()
}
})
依赖库配置 build.sbt
name := "scaladev"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.4.0-cdh6.3.2"
libraryDependencies ++= Seq(
"joda-time" % "joda-time" % "2.3",
"org.joda" % "joda-convert" % "1.6",
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"mysql" % "mysql-connector-java" % "5.1.6",
)
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion
libraryDependencies += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13-cloudera.1"
文章评论