本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将hive数据导入到 elasticsearch 中,不需要编写代码只需要 Hive SQL 实现对数据的ETL

一、开发环境
1、组件版本
- CDH 集群版本:6.3.2
- ES 版本:7.7.0
- Hive 版本:2.1.1
- ES-Hadoop 版本:7.7.0
2、配置Hive 支持 ES-Hadoop
一共有四个方法可以配置,可以任选一个进行安装配置
(1)使用 add jar
在hive命令行或者hive sql语句执行
add jar /path/elasticsearch-hadoop-7.7.0.jar;
add jar /path/commons-httpclient-3.1.jar;
(2) 使用 hive.aux.jars.path
$ bin/hive --auxpath=/path/ elasticsearch-hadoop-7.7.0.jar, /path/commons-httpclient-3.1.jar
(3) 修改配置(hive-site.xml
)
增加配置
<property>
<name>hive.aux.jars.path</name>
<value>/path/elasticsearch-hadoop-7.7.0.jar,/path/commons-httpclient-3.1.jar</value>
<description>A comma separated list (with no spaces) of the jar files</description>
</property>
(4) CDH6.X 推荐的安装方法
将 elasticsearch-hadoop-7.7.0.jar commons-httpclient-3.1.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可
二、创建hive与elasticsearch交互的外部表导入数据
1、创建外部表 实现hive与elasticsearch交互
drop table if exists test;
create table test(key string,value string)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'testindex',
'es.nodes'='192.168.1.100',
'es.net.http.auth.user'='demo',
'es.net.http.auth.pass'='demo',
'es.port'='9200',
'es.nodes.wan.only'='true');
配置项说明
es.resource 用于设置 elasticsearch的索引名称,默认该配置项同时设置了读和写的索引
es.nodes 设置ES的节点
es.net.http.auth.user es.net.http.auth.pass 设置ES认证的用户密码
es.port 设置ES服务端口
es.nodes.wan.only 默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes
声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
es.nodes.discovery
:默认为 true,表示自动发现集群可用节点;
es.index.auto.create
:是否自动创建不存在的索引,默认为 true;
es.mapping.names
用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。
2、将hive中的表数据导入到外部表
#### 禁用hive的推测执行
SET hive.mapred.reduce.tasks.speculative.execution = false;
SET mapreduce.map.speculative = false;
SET mapreduce.reduce.speculative = false;
insert overwrite table test select * from test1;
hive sql作业执行完成后在elasticsearch中即可查询到hive导入的数据
3、导入数据过程中遇到的坑
在使用 Hive 向 ElasticSearch 中导出数据的时候,有一个大坑非常关键:并发!
这种导入数据的方法会启动 mapper 来执行,但是如果 mapper 数过高,ElasticSearch 过载(Overload)或者多出冗余的数据。据分析,原因是 mapper 会批量地将数据插入 ElasticSearch 中,而当 mapper 失败的时候会被重启,已经导入的数据又会重新插入 ElasticSearch 导致数据重复。
其中的一个方法就是适当减少并发的 mapper 的数量。通过合并文件的手段可以降低 mapper 的数量。
SET mapred.max.split.size=1000000000;
SET mapred.min.split.size=1000000000;
set mapred.min.split.size.per.node=1000000000;
set mapred.min.split.size.per.rack=1000000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
另外,还可以通过设置 ElasticSearch 的 _id 字段避免重复。比如为每一行生成一个 uuid,把 uuid 作为 mappings 中的 _id。ElasticSearch 对于 uuid 相同的记录会覆盖旧的记录,这样相同 uuid 的记录就不会重复了。
drop table if exists test;
create table test(key string,value string)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'testindex',
'es.nodes'='192.168.1.100',
'es.net.http.auth.user'='demo',
'es.net.http.auth.pass'='demo',
'es.port'='9200',
'es.mapping.id' = 'uuid',
'es.nodes.wan.only'='true');
shell脚本完整代码
####### execute hive ######
TEMP_TABLE="temp"
today=`date +%Y%m%d`
sql=$(cat <<!EOF
USE default;
drop table if exists ${TEMP_TABLE};
create table ${TEMP_TABLE}(key string,value string)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'testindex',
'es.nodes'='192.168.1.100',
'es.net.http.auth.user'='demo',
'es.net.http.auth.pass'='demo',
'es.port'='9200',
'es.mapping.id' = 'uuid',
'es.nodes.wan.only'='true');
!EOF
)
############ execute begin ###########
echo $sql
/usr/bin/hive -e "$sql"
exitCode=$?
if [ $exitCode -ne 0 ];then
echo "[ERROR] hive execute failed!"
exit $exitCode
fi
参考资料
文章评论