12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- # source, channel, sink definition
- agent.channels = spooling tail
- agent.sources = spooling-source tail-source
- agent.sinks = hdfs kafka
-
- # Channel
- # Define a memory channel
- #agent.channels.spooling.type = memory
- #agent.channels.tail.type = memory
- # Define a file channel
- agent.channels.spooling.type = file
- agent.channels.spooling.checkpointDir = /root/flume/spooling/tmp
- agent.channels.spooling.dataDirs = /root/flume/spooling/data
- agent.channels.tail.type = file
- agent.channels.tail.checkpointDir = /root/flume/tail/tmp
- agent.channels.tail.dataDirs = /root/flume/tail/data
-
- # Source
- agent.sources.spooling-source.type = spooldir
- agent.sources.spooling-source.spoolDir = /root/flume/input
- agent.sources.spooling-source.channels = spooling
- agent.sources.spooling-source.fileHeader = true
- agent.sources.spooling-source.fileHeaderKey=file
- agent.sources.spooling-source.basenameHeader=true
- agent.sources.spooling-source.basenameHeaderKey=basename
- ##日志发送完毕后,是否删除此源文件,
- #“immediate”表示发送完毕后立即删除,可以节约磁盘空间
- agent.sources.spooling-source.deletePolicy=never
- agent.sources.tail-source.type=TAILDIR
- agent.sources.tail-source.channels=tail
- ##本人不想写flume的扩展代码,所以就为每个tail的文件指定一个group
- agent.sources.tail-source.filegroups=www error
- agent.sources.tail-source.filegroups.www=/data/logs/agent/www.log
- agent.sources.tail-source.filegroups.error=/data/logs/agent/error.log
- ##对于taildir,需要间歇性的保存tail文件的位置,以便中断后可以继续
- ##json格式文件
- agent.sources.tail-source.positionFile=/data/flume/.flume/ch-tail/taildir_position.json
- ##每个tail的文件,创建一个kafka topic
- agent.sources.tail-source.headers.www.topic=agent-www
- agent.sources.tail-source.headers.error.topic=agent-error
- agent.sources.tail-source.skipToEnd=true
- agent.sources.tail-source.interceptors=i1 i2
- agent.sources.tail-source.interceptors.i1.type=timestamp
- agent.sources.tail-source.interceptors.i2.type=host
- agent.sources.tail-source.interceptors.i2.useIP=true
- agent.sources.tail-source.interceptors.i2.hostHeader=host
- # Sink
- agent.sinks.hdfs.channel = spooling
- agent.sinks.hdfs.type = hdfs
- agent.sinks.hdfs.hdfs.path = hdfs://data-hadoop:9000/flume
- agent.sinks.hdfs.hdfs.fileType = DataStream
- agent.sinks.hdfs.hdfs.writeFormat = TEXT
- agent.sinks.hdfs.hdfs.rollInterval = 4
- # tail实时数据
- agent.sinks.kafka.channel=ch-tail
- agent.sinks.kafka.type=org.apache.flume.sink.kafka.KafkaSink
- # kafka集群地址,可以为其子集
- agent.sinks.kafka.kafka.bootstrap.servers=data-kafka:9092,data-kafka1:9092,data-kafka2:9092
- # 注意,topic中不支持参数化
- # 但是为了提高扩展性,我们把topic信息通过header方式控制
- #agent.sinks.kafka.kafka.topic=agent-%{filename}
- # default 100,值越大,网络效率越高,但是延迟越高,准实时
- agent.sinks.kafka.flumeBatchSize=32
- agent.sinks.kafka.kafka.producer.acks=1
- #use Avro-event format,will contain flume-headers
- #default : false
- agent.sinks.kafka.useFlumeEventFormat=false
|