Flume Kafka Zookeeper整合安装手册
一、JDK安装
1.1 JDK1.8下载
到oracle官网下载jdk8u101linuxx64.
tar.gz
先查找并卸载自带openJDK
rpm ‐qa |grep openjdk
yum ‐y remove *openjdk*
解压jdk8u101linuxx64.
tar.gz到安装目录,如果没有特别说明,全文中安装目录都是指/data/soft/
remove jdk‐8u101‐linux‐x64.tar.gz /data/soft/
cd /data/soft
tar zxvf jdk‐8u101‐linux‐x64.tar.gz
mv jdk‐8u101‐linux‐x64.tar.gz java
配置JAVA_HOME
echo '## java configuration ' >> /etc/profile
echo 'export JAVA_HOME=/data/soft/java' >> /etc/profile
echo 'export PATH=.:$JAVA_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
验证是否安装成功
java ‐version
二、Zookeeper安装
2.1 Zookeeper下载准备
去Zookeeper官网或者用wget下载安装包zookeeper3.4.9.
tar.gz
2.2 Zookeeper安装配置
把[zookeeper3.4.9.
tar.gz移动到安装目录并解压
mv zookeeper‐3.4.9.tar.gz /data/soft/flume
cd /data/soft
tar zxvf zookeeper‐3.4.9.tar.gz
mv zookeeper‐3.4.9 zookeeper
编辑并保存zoo.cfg,配置文件如下
vi /data/soft/zookeeper/conf/zoo.cfg
#重新写入以下配置
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/soft/zookeeper
clientPort=2181
server.1=collector1:2888:3999
server.2=collector2:2888:3999
server.3=collector3:2888:3999
编辑hosts文件,对应server.*的主机名
vi /etc/hosts
#写入集群主机名映射关系
10.20.26.184 collector1
10.20.26.185 collector2
10.20.26.186 collector3
在dataDir下新增文件myid,填写值为server.*对应的编号
echo '1'>/data/soft/zookeeper/myid
添加Zookeeper环境变量
echo '## zk configuration ' >> /etc/profile
echo 'export ZOOKEEPER_HOME=/data/soft/zookeeper' >> /etc/profile
echo 'export PATH=$ZOOKEEPER_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
启动Zookeeper并查看状态,可以看到节点是leader还是follower
zkServer.sh start
zkServer.sh status
三、Kafka安装
3.1 Kafka下载准备
从Kafka官网或者用wget下载Kafka安装包kafka_2.100.10.1.1.
tgz
3.2 Kafka安装配置
移动并解压到安装目录/data/soft
mv /root/kafka_2.10‐0.10.1.1.tgz /data/soft/
cd /data/soft
tar zxvf kafka_2.10‐0.10.1.1.tgz
mv kafka_2.10‐0.10.1.1 kafka
修改配置文件
vi /data/soft/kafka/config/server.properties
#替换为
broker.id=1
auto.create.topics.enable=true
delete.topic.enable=true
default.replication.factor=2
num.network.threads=3
num.io.threads=8
num.partitions=3
num.recovery.threads.per.data.dir=1
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
delete.topic.enable=true
log.dirs=/data/soft/kafka/logs
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
zookeeper.connect=collector1:2181,collector2:2181,collector3:2181
创建日志目录,并赋予读写权限
mkdir ‐p /data/soft/kafka/logs
chmod ‐R 777 /data/soft/kafka/logs
添加kafka环境变量
echo '## zk configuration ' >> /etc/profile
echo 'export KAFKA_HOME=/data/soft/kafka' >> /etc/profile
echo 'export PATH=$KAFKA_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
修改kafka内存设置
vi kafka‐server‐start.sh
#在头部插入,如果内存充足可以上调
export KAFKA_HEAP_OPTS="‐Xmx3G ‐Xms1G"
#wq
启动kafka并查看状态
kafka‐server‐start.sh ‐daemon /data/soft/kafka/config/server.properties
kafka‐console‐producer.sh ‐‐broker‐list collector1:9092,collector2:9092,collector3:9092 ‐‐topic
demo_test
kafka‐topics.sh ‐‐list ‐‐zookeeper collector1:2181,collector2:2181,collector3:2181
四、Flume安装
4.1 Flume下载准备
去Flume官网或者用wget下载安装包apacheflume1.7.0bin.
tar.gz
如果有修改源码的需求,可以下载源码包:apacheflume1.7.0src.
tar.gz
Flume需要依赖jdk1.7以上,这里不再描述。
4.2Flume安装与配置
把apacheflume1.7.0bin.
tar.gz移动到待安装目录并解压
mkdir ‐p /data/soft
mv apache‐flume‐1.7.0‐bin.tar.gz /data/soft
cd /data/soft
tar zxvf apache‐flume‐1.7.0‐bin.tar.gz
mv apache‐flume‐1.7.0‐bin flume
配置好FLUME_HOME的环境变量
echo '## flume configuration ' >> /etc/profile
echo 'export FLUME_HOME=/data/soft/flume' >> /etc/profile
echo 'export PATH=$FLUME_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
查看版本,如果显示版本正常则表示安装成功,然后按自己业务配置一套source>
channel>
sink的配置,这里
以collectconf.
properties为例
mkdir ‐p /data/flume/pad
mkdir ‐p /data/flume/adx
flume‐ng version
#create properties
vi collect‐conf.properties
#wq
配置内容如下:
############################################
# producer config
###########################################
#*****************agent section**********************
producer.sources = source_pad source_adx
producer.channels = channel_pad channel_adx
producer.sinks = sink_pad sink_adx
#*****************source section**********************
#pad source section
producer.sources.source_pad.type = spooldir
producer.sources.source_pad.channels = channel_pad
producer.sources.source_pad.fileHeader = true
producer.sources.source_pad.fileHeaderKey=PAD
producer.sources.source_pad.spoolDir=/data/flume/pad
producer.sources.source_pad.batchSize=1000
producer.sources.source_pad.bufferMaxLineLength=10000
#adx source section
producer.sources.source_adx.type = spooldir
producer.sources.source_adx.channels = channel_adx
producer.sources.source_pad.fileHeader = true
producer.sources.source_pad.fileHeaderKey=ADX
producer.sources.source_adx.spoolDir=/data/flume/adx
producer.sources.source_adx.batchSize=1000
producer.sources.source_pad.bufferMaxLineLength=10000
#*****************sink section**********************
#pad sink section
producer.sinks.sink_pad.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.sink_pad.kafka.bootstrap.servers=collector1:9092,collector2:9092,collector3:9092
producer.sinks.sink_pad.kafka.flumeBatchSize=1000
#1 one,0 none,‐1 all
producer.sinks.sink_pad.kafka.producer.acks=1
producer.sinks.sink_pad.kafka.producer.type=sync
producer.sinks.sink_pad.kafka.topic=pad_report_data
producer.sinks.sink_pad.kafka.producer.compression.type = snappy
producer.sinks.sink_pad.kafka.producer.linger.ms=1
producer.sinks.sink_pad.channel = channel_pad
#adx sink section
producer.sinks.sink_adx.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.sink_adx.kafka.bootstrap.servers=collector1:9092,collector2:9092,collector3:9092
producer.sinks.sink_adx.kafka.flumeBatchSize=200
#1 one,0 none,‐1 all
producer.sinks.sink_adx.kafka.producer.acks=1
producer.sinks.sink_adx.kafka.producer.type=sync
producer.sinks.sink_adx.kafka.topic=adx_report_data
producer.sinks.sink_adx.kafka.producer.compression.type = snappy
producer.sinks.sink_adx.kafka.producer.linger.ms=1
producer.sinks.sink_adx.channel = channel_adx
#*****************channel section**********************
#pad channel section
producer.channels.channel_pad.type = memory
producer.channels.channel_pad.capacity = 10000
producer.channels.channel_pad.transactionCapacity=1000
#pad channel section
producer.channels.channel_adx.type = memory
producer.channels.channel_adx.capacity = 10000
producer.channels.channel_adx.transactionCapacity=1000
由于flume默认配置jvm的内存太小,这里修改一下启动参数,找到JAVA_OPTS这一行
vi bin/flume‐ng
JAVA_OPTS="‐Xms512m ‐Xmx3072m"
后台启动Flume,开启http监控
nohup flume‐ng agent ‐c conf ‐f /data/soft/flume/conf/collect‐conf.properties ‐n producer ‐
Dflume.monitoring.type=http ‐Dflume.monitoring.port=34545 >/data/soft/flume/logs/cat.out 2>&1 &
ps ‐ef|grep flume
访问监听端口http://IP:34545,可以看到返回的json状态
五、集群分发
5.1 Java分发
在主节点执行分发命令
for i in {2..3};do scp ‐r /data/soft/java/ root@collector$i:/data/soft;done;
5.2 Zookeeper分发
在主节点执行分发命令
for i in {2..3};do scp ‐r /data/soft/zookeeper/ root@collector$i:/data/soft;done;
在各从节点修改myid,对照zoo.cfg中server.*对应的编号,比如该节点IP对应的server号码是2,则
echo '2'>/data/soft/zookeeper/myid
#依次类推
5.3 Kafka分发
在主节点执行分发命令
for i in {2..3};do scp ‐r /data/soft/kafka/ root@collector$i:/data/soft;done;
在各节点上修改broker.id
vi /data/soft/kafka/config/server.properties
#将broker.id替换为,数字随节点递增即可,每个节点要求不一致
broker.id=2
5.4 Flume分发
在主节点执行分发命令
for i in {2..3};do scp ‐r /data/soft/flume/ root@collector$i:/data/soft;done;
在各从节点上创建必要的数据目录,要求运行用户有读写权限
mkdir ‐p /data/flume/pad
mkdir ‐p /data/flume/adx
修改collectconf.
properties中的agent名称,即配置文件中第一个前缀,注意:启动时参数中agent n
要与配置
文件一致
5.5 Profile分发
最后本次安装配置的profile环境变量最终合并为以下
## java configuration
export JAVA_HOME=/data/soft/java
## zk configuration
export ZOOKEEPER_HOME=/data/soft/zookeeper
## flume configuration
export FLUME_HOME=/data/soft/flume
## zk configuration
export KAFKA_HOME=/data/soft/kafka
export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin:$FLUME_HOME/bin:$PATH
修改后,分发profile并生效
for i in {2..3};do scp ‐r /etc/profile root@collector$i:/etc;done;
#在各节点执行
source /etc/profile
5.4 启动验证节点
5.4.1 java验证
在各从节点上执行验证
java ‐version
#是否显示正确版本
5.4.2 Zookeeper启动验证
在各从节点上启动Zookeeper并查看状态,可以看到节点是leader还是follower
zkServer.sh start
zkServer.sh status
5.4.3 Kafka启动验证
在从节点启动kafka并查看状态
kafka‐server‐start.sh ‐daemon /data/soft/kafka/config/server.properties
#以下命令在集群节点执行即可,不限于同一个节点
kafka‐console‐producer.sh ‐‐broker‐list collector1:9092,collector2:9092,collector3:9092 ‐‐topic
demo_test
kafka‐topics.sh ‐‐list ‐‐zookeeper collector1:2181,collector2:2181,collector3:2181
5.4.4 Flume启动验证
后台启动Flume,开启http监控,注意:各节点agent名称最好配置成不一致的,以后方便集群,命令行的参数
要与对应配置文件中前缀一致,如:n
producer1
nohup flume‐ng agent ‐c conf ‐f /data/soft/flume/conf/collect‐conf.properties ‐n producer ‐
Dflume.monitoring.type=http ‐Dflume.monitoring.port=34545 >/data/soft/flume/logs/cat.out 2>&1 &ps
‐ef|grep flume
访问监听端口http://IP:34545,可以看到返回的json状态
验证Zookeeper、Kafka、Flume整合是否成功:
mv(不要拷贝)文本到flume指定文件夹/data/flume/pad,此时应该用命令可以读取到数据
kafka‐console‐consumer.sh ‐‐zookeeper collector1:2181,collector2:2181,collector3:2181 ‐‐frombeginning
‐‐topic pad_report_data