博客信息

Flume Kafka Zookeeper整合安装手册

发布时间:『 2017-01-04 21:14』  博客类别:Linux  阅读(1507) 评论(0)

Flume Kafka Zookeeper整合安装手册

一、JDK安装

1.1 JDK1.8下载

到oracle官网下载jdk8u101linuxx64.

tar.gz

先查找并卸载自带openJDK

rpm ‐qa |grep openjdk

yum ‐y remove *openjdk*

解压jdk8u101linuxx64.

tar.gz到安装目录,如果没有特别说明,全文中安装目录都是指/data/soft/

remove jdk‐8u101‐linux‐x64.tar.gz /data/soft/

cd /data/soft

tar zxvf jdk‐8u101‐linux‐x64.tar.gz

mv jdk‐8u101‐linux‐x64.tar.gz java

配置JAVA_HOME

echo '## java configuration ' >> /etc/profile

echo 'export JAVA_HOME=/data/soft/java' >> /etc/profile

echo 'export PATH=.:$JAVA_HOME/bin:$PATH' >> /etc/profile

source /etc/profile

验证是否安装成功

java ‐version

二、Zookeeper安装

2.1 Zookeeper下载准备

去Zookeeper官网或者用wget下载安装包zookeeper3.4.9.

tar.gz

2.2 Zookeeper安装配置

把[zookeeper3.4.9.

tar.gz移动到安装目录并解压

mv zookeeper‐3.4.9.tar.gz /data/soft/flume

cd /data/soft

tar zxvf zookeeper‐3.4.9.tar.gz

mv zookeeper‐3.4.9 zookeeper

编辑并保存zoo.cfg,配置文件如下

vi /data/soft/zookeeper/conf/zoo.cfg

#重新写入以下配置

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/data/soft/zookeeper

clientPort=2181

server.1=collector1:2888:3999

server.2=collector2:2888:3999

server.3=collector3:2888:3999

编辑hosts文件,对应server.*的主机名

vi /etc/hosts

#写入集群主机名映射关系

10.20.26.184 collector1

10.20.26.185 collector2

10.20.26.186 collector3

在dataDir下新增文件myid,填写值为server.*对应的编号

echo '1'>/data/soft/zookeeper/myid

添加Zookeeper环境变量

echo '## zk configuration ' >> /etc/profile

echo 'export ZOOKEEPER_HOME=/data/soft/zookeeper' >> /etc/profile

echo 'export PATH=$ZOOKEEPER_HOME/bin:$PATH' >> /etc/profile

source /etc/profile

启动Zookeeper并查看状态,可以看到节点是leader还是follower

zkServer.sh start

zkServer.sh status

三、Kafka安装

3.1 Kafka下载准备

从Kafka官网或者用wget下载Kafka安装包kafka_2.100.10.1.1.

tgz

3.2 Kafka安装配置

移动并解压到安装目录/data/soft

mv /root/kafka_2.10‐0.10.1.1.tgz /data/soft/

cd /data/soft

tar zxvf kafka_2.10‐0.10.1.1.tgz

mv kafka_2.10‐0.10.1.1 kafka

修改配置文件

vi /data/soft/kafka/config/server.properties

#替换为

broker.id=1

auto.create.topics.enable=true

delete.topic.enable=true

default.replication.factor=2

num.network.threads=3

num.io.threads=8

num.partitions=3

num.recovery.threads.per.data.dir=1

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

delete.topic.enable=true

log.dirs=/data/soft/kafka/logs

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connection.timeout.ms=6000

zookeeper.connect=collector1:2181,collector2:2181,collector3:2181

创建日志目录,并赋予读写权限

mkdir ‐p /data/soft/kafka/logs

chmod ‐R 777 /data/soft/kafka/logs

添加kafka环境变量

echo '## zk configuration ' >> /etc/profile

echo 'export KAFKA_HOME=/data/soft/kafka' >> /etc/profile

echo 'export PATH=$KAFKA_HOME/bin:$PATH' >> /etc/profile

source /etc/profile

修改kafka内存设置

vi kafka‐server‐start.sh

#在头部插入,如果内存充足可以上调

export KAFKA_HEAP_OPTS="‐Xmx3G ‐Xms1G"

#wq

启动kafka并查看状态

kafka‐server‐start.sh ‐daemon /data/soft/kafka/config/server.properties

kafka‐console‐producer.sh ‐‐broker‐list collector1:9092,collector2:9092,collector3:9092 ‐‐topic

demo_test

kafka‐topics.sh ‐‐list ‐‐zookeeper collector1:2181,collector2:2181,collector3:2181

四、Flume安装

4.1 Flume下载准备

去Flume官网或者用wget下载安装包apacheflume1.7.0bin.

tar.gz

如果有修改源码的需求,可以下载源码包:apacheflume1.7.0src.

tar.gz

Flume需要依赖jdk1.7以上,这里不再描述。

4.2Flume安装与配置

把apacheflume1.7.0bin.

tar.gz移动到待安装目录并解压

mkdir ‐p /data/soft

mv apache‐flume‐1.7.0‐bin.tar.gz /data/soft

cd /data/soft

tar zxvf apache‐flume‐1.7.0‐bin.tar.gz

mv apache‐flume‐1.7.0‐bin flume

配置好FLUME_HOME的环境变量

echo '## flume configuration ' >> /etc/profile

echo 'export FLUME_HOME=/data/soft/flume' >> /etc/profile

echo 'export PATH=$FLUME_HOME/bin:$PATH' >> /etc/profile

source /etc/profile

查看版本,如果显示版本正常则表示安装成功,然后按自己业务配置一套source>

channel>

sink的配置,这里

以collectconf.

properties为例

mkdir ‐p /data/flume/pad

mkdir ‐p /data/flume/adx

flume‐ng version

#create properties

vi collect‐conf.properties

#wq

配置内容如下:

############################################

# producer config

###########################################

#*****************agent section**********************

producer.sources = source_pad source_adx

producer.channels = channel_pad channel_adx

producer.sinks = sink_pad sink_adx

#*****************source section**********************

#pad source section

producer.sources.source_pad.type = spooldir

producer.sources.source_pad.channels = channel_pad

producer.sources.source_pad.fileHeader = true

producer.sources.source_pad.fileHeaderKey=PAD

producer.sources.source_pad.spoolDir=/data/flume/pad

producer.sources.source_pad.batchSize=1000

producer.sources.source_pad.bufferMaxLineLength=10000

#adx source section

producer.sources.source_adx.type = spooldir

producer.sources.source_adx.channels = channel_adx

producer.sources.source_pad.fileHeader = true

producer.sources.source_pad.fileHeaderKey=ADX

producer.sources.source_adx.spoolDir=/data/flume/adx

producer.sources.source_adx.batchSize=1000

producer.sources.source_pad.bufferMaxLineLength=10000

#*****************sink section**********************

#pad sink section

producer.sinks.sink_pad.type = org.apache.flume.sink.kafka.KafkaSink

producer.sinks.sink_pad.kafka.bootstrap.servers=collector1:9092,collector2:9092,collector3:9092

producer.sinks.sink_pad.kafka.flumeBatchSize=1000

#1 one,0 none,‐1 all

producer.sinks.sink_pad.kafka.producer.acks=1

producer.sinks.sink_pad.kafka.producer.type=sync

producer.sinks.sink_pad.kafka.topic=pad_report_data

producer.sinks.sink_pad.kafka.producer.compression.type = snappy

producer.sinks.sink_pad.kafka.producer.linger.ms=1

producer.sinks.sink_pad.channel = channel_pad

#adx sink section

producer.sinks.sink_adx.type = org.apache.flume.sink.kafka.KafkaSink

producer.sinks.sink_adx.kafka.bootstrap.servers=collector1:9092,collector2:9092,collector3:9092

producer.sinks.sink_adx.kafka.flumeBatchSize=200

#1 one,0 none,‐1 all

producer.sinks.sink_adx.kafka.producer.acks=1

producer.sinks.sink_adx.kafka.producer.type=sync

producer.sinks.sink_adx.kafka.topic=adx_report_data

producer.sinks.sink_adx.kafka.producer.compression.type = snappy

producer.sinks.sink_adx.kafka.producer.linger.ms=1

producer.sinks.sink_adx.channel = channel_adx

#*****************channel section**********************

#pad channel section

producer.channels.channel_pad.type = memory

producer.channels.channel_pad.capacity = 10000

producer.channels.channel_pad.transactionCapacity=1000

#pad channel section

producer.channels.channel_adx.type = memory

producer.channels.channel_adx.capacity = 10000

producer.channels.channel_adx.transactionCapacity=1000

由于flume默认配置jvm的内存太小,这里修改一下启动参数,找到JAVA_OPTS这一行

vi bin/flume‐ng

JAVA_OPTS="‐Xms512m ‐Xmx3072m"

后台启动Flume,开启http监控

nohup flume‐ng agent ‐c conf ‐f /data/soft/flume/conf/collect‐conf.properties ‐n producer ‐

Dflume.monitoring.type=http ‐Dflume.monitoring.port=34545 >/data/soft/flume/logs/cat.out 2>&1 &

ps ‐ef|grep flume

访问监听端口http://IP:34545,可以看到返回的json状态

五、集群分发

5.1 Java分发

在主节点执行分发命令

for i in {2..3};do scp ‐r /data/soft/java/ root@collector$i:/data/soft;done;

5.2 Zookeeper分发

在主节点执行分发命令

for i in {2..3};do scp ‐r /data/soft/zookeeper/ root@collector$i:/data/soft;done;

在各从节点修改myid,对照zoo.cfg中server.*对应的编号,比如该节点IP对应的server号码是2,则

echo '2'>/data/soft/zookeeper/myid

#依次类推

5.3 Kafka分发

在主节点执行分发命令

for i in {2..3};do scp ‐r /data/soft/kafka/ root@collector$i:/data/soft;done;

在各节点上修改broker.id

vi /data/soft/kafka/config/server.properties

#将broker.id替换为,数字随节点递增即可,每个节点要求不一致

broker.id=2

5.4 Flume分发

在主节点执行分发命令

for i in {2..3};do scp ‐r /data/soft/flume/ root@collector$i:/data/soft;done;

在各从节点上创建必要的数据目录,要求运行用户有读写权限

mkdir ‐p /data/flume/pad

mkdir ‐p /data/flume/adx

修改collectconf.

properties中的agent名称,即配置文件中第一个前缀,注意:启动时参数中agent n

要与配置

文件一致

5.5 Profile分发

最后本次安装配置的profile环境变量最终合并为以下

## java configuration

export JAVA_HOME=/data/soft/java

## zk configuration

export ZOOKEEPER_HOME=/data/soft/zookeeper

## flume configuration

export FLUME_HOME=/data/soft/flume

## zk configuration

export KAFKA_HOME=/data/soft/kafka

export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin:$FLUME_HOME/bin:$PATH

修改后,分发profile并生效

for i in {2..3};do scp ‐r /etc/profile root@collector$i:/etc;done;

#在各节点执行

source /etc/profile

5.4 启动验证节点

5.4.1 java验证

在各从节点上执行验证

java ‐version

#是否显示正确版本

5.4.2 Zookeeper启动验证

在各从节点上启动Zookeeper并查看状态,可以看到节点是leader还是follower

zkServer.sh start

zkServer.sh status

5.4.3 Kafka启动验证

在从节点启动kafka并查看状态

kafka‐server‐start.sh ‐daemon /data/soft/kafka/config/server.properties

#以下命令在集群节点执行即可,不限于同一个节点

kafka‐console‐producer.sh ‐‐broker‐list collector1:9092,collector2:9092,collector3:9092 ‐‐topic

demo_test

kafka‐topics.sh ‐‐list ‐‐zookeeper collector1:2181,collector2:2181,collector3:2181

5.4.4 Flume启动验证

后台启动Flume,开启http监控,注意:各节点agent名称最好配置成不一致的,以后方便集群,命令行的参数

要与对应配置文件中前缀一致,如:n

producer1

nohup flume‐ng agent ‐c conf ‐f /data/soft/flume/conf/collect‐conf.properties ‐n producer ‐

Dflume.monitoring.type=http ‐Dflume.monitoring.port=34545 >/data/soft/flume/logs/cat.out 2>&1 &ps

‐ef|grep flume

访问监听端口http://IP:34545,可以看到返回的json状态

验证Zookeeper、Kafka、Flume整合是否成功:

mv(不要拷贝)文本到flume指定文件夹/data/flume/pad,此时应该用命令可以读取到数据

kafka‐console‐consumer.sh ‐‐zookeeper collector1:2181,collector2:2181,collector3:2181 ‐‐frombeginning

‐‐topic pad_report_data


关键字:   无
评论信息
暂无评论
发表评论
验证码: 
Powered by IMZHANGJIE.CN Copyright © 2015-2025 粤ICP备14056181号