一、简介:
根据上面的定义,我们模拟如下的流程:(arvo,一种网络字节流传输规范)
二、从tail命令获取数据发送到avro端口(mini1机器)
另一个节点可配置一个avro源来中继数据,发送外部存储tail-avro.conf:
################### Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /home/hadoop/log/test.loga1.sources.r1.channels = c1# Describe the sinka1.sinks = k1a1.sinks.k1.type = avroa1.sinks.k1.channel = c1a1.sinks.k1.hostname = mini02a1.sinks.k1.port = 4141a1.sinks.k1.batch-size = 2# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
三、从avro端口接收数据,下沉到logger(mini02机器)
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = avroa1.sources.r1.channels = c1a1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 4141# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
三、命令的执行
mini02上执行
bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
mini01上执行
bin/flume-ng agent -c conf -f conf/tail-avro.conf -n a1
同时模拟mini01数据的写入:
i=1;
while(( $i<=500000 ));do echo $i >> /home/hadoop/log/test.log;sleep 0.5; let 'i++';done