
环境准备
- zeppelin 0.8.2
- cloudera 6.3.2
- spark 2.4.0-cdh6.3.2
- kafka 2.2.1
安装配置zeppelin
1、在安装zeppelin的节点安装cloudera agent,然后安装spark gateway、hive gateway客户端。
2、解压缩zeppelin-0.8.2-bin-all.tgz文件到/data目录
tar zxvf zeppelin-0.8.2-bin-all.tgz -C /data
3 进入zeppelin-0.8.2-bin-all配置文件目录配置zeppelin
cd /data/zeppelin-0.8.2-bin-all/conf
cp zeppelin-env.sh.template zeppelin-env.sh
cp shiro.ini.template shiro.ini
cp zeppelin-site.xml.template zeppelin-site.xml
编辑zeppelin-env.sh文件,增加以下配置
export JAVA_HOME=/opt/jdk
export HADOOP_CONF_DIR=/etc/hadoop/conf
export ZEPPELIN_INTP_CLASSPATH_OVERRIDES=/etc/hive/conf
编辑zeppelin-site.xml文件
修改zeppelin.server.addr的value值为 0.0.0.0或者本机hostname
修改zeppelin.interpreter.dep.mvnRepo的value值为https://mirrors.huaweicloud.com/repository/maven/
禁用匿名用户登录
修改zeppelin.anonymous.allowed的value值为false
编辑shiro.ini ,增加管理员用户与管理组
格式为用户名=密码,用户组
root = root, admin
配置文件配置完成保存后,启动zeppelin服务
bin/zeppelin-daemon.sh start
配置zeppelin spark interpreter
启动zeppelin服务后,浏览器打开地址 http://IP:8080 (IP为安装zeppelin服务节点IP地址)
在右上角点击login,使用上面配置的用户密码登录,登录后需要先创建notebook, 点击create new note创建新的notebook,进入创建好的notebook
点击右上角用户名,选择interpreter,然后在搜索框中搜索spark找到spark interpreter,点击edit按钮配置
在Properties中增加 SPARK_HOME value /opt/cloudera/parcels/CDH/lib/spark
修改master value为 yarn-client
spark.cores.max 20
spark.executor.memory
因为zeppelin 0.8.2与cloudera 6存在兼容性问题,需要在Dependencies中增加两个 artifact
/opt/cloudera/parcels/CDH/jars/libfb303-0.9.3.jar
/opt/cloudera/parcels/CDH/jars/libthrift-0.9.3.jar
删除zeppelin安装目录zeppelin-0.8.2-bin-all/lib/interpreter中的 libthrift-0.9.2.jar文件,主要是解决jar包冲突错误
java.lang.NoSuchMethodError:com.facebook.fb303.FacebookService$Client.sendBaseOneway
修改完成后,重新启动zeppelin服务
在notebook中运行代码
val sqlDF = spark.sql("select * from default.test limit 10");
sqlDF.show();
点击run 即可看到hive中表数据
整合kafka做实时计算
配置spark streaming、kafka依赖
在interperter中编辑 在Dependencies中增加三个 artifact
org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0-cdh6.3.2
org.apache.spark:spark-streaming:2.4.0-cdh6.3.2
org.apache.spark:spark-sql:2.4.0-cdh6.3.2
在notebook中插入一下代码即可消费kafka数据做实时分析
%spark
sc.version
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lag, _}
import org. apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
val ssc = StreamingContext.getActiveOrCreate(() => new StreamingContext(sc, Seconds(10)))
/*def processEngine(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(5))
ssc
}*/
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "emg104:9092,emg105:9092,emg106:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "for_gps_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("ecar-photo-gps")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record=>record.value())
.window(Seconds(10), Seconds(10))
.foreachRDD(rdd=>{
if(!rdd.isEmpty()) {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = spark.read.json(rdd)
val explodedf = df.select(explode($"gps")).toDF("gps")
val gps_df = explodedf.select("gps.terminalid", "gps.time", "gps.lat", "gps.lon")
val window = Window.partitionBy("terminalid").orderBy("time")
val lagCol = lag(col("time"), 1).over(window)
gps_df.withColumn("beforetime", lagCol).show()
}
})
ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext=false, stopGracefully=true)
文章评论