最近在使用spark开发的过程中,遇到了一个空指针错误,spark处理数据的过程主要是 spark读取数据库数据做清洗转换,然后需要把数据和数据库中另一个表做匹配查询,查询到结果后存到hdfs.
因为数据量比较大,所以读出数据后,使用了foreachpartition在每个分区做处理优化,打算在分区中匹配数据后转换为dataframe,直接保存到hive,结果调试的过程中遇到了空指针错误,主要代码如下
var propdb = new java.util.Properties
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.jdbc("jdbc:url", "demo", propdb )
df.foreachpartition(partition => {
val gpsArr = new Arraybuffer(Gps)
partition.foreach(data => {
gpsArr.append(data)
})
gpsList.toList.toDF("id","name")
})
结果运行调试的时候报告错误

google搜索了一下问题,找到答案
空指针异常的原因是因为dataframe,rdd或dataset只能存在于driver上,不能在worker节点创建
解决方法: 写入hdfs文件的话,可以使用stream流方式逐行写入
var propdb = new java.util.Properties
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.jdbc("jdbc:url", "demo", propdb )
val path = new Path(hdfsPath + System.currentTimeMillis())
// 创建一个HDFS outputStream流
val outputStream = if (fs.exists(path)){
fs.append(path)
}else{
fs.create(path)
}
df.foreachpartition(partition => {
partition.foreach(message => outputStream.write((message + "\n").getBytes("UTF-8")))
})
outputStream.close()
})
文章评论