| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 | # source, channel, sink definition  agent.channels = spooling tailagent.sources = spooling-source tail-sourceagent.sinks = hdfs kafka  # Channel  # Define a memory channel#agent.channels.spooling.type = memory#agent.channels.tail.type = memory# Define a file channelagent.channels.spooling.type = fileagent.channels.spooling.checkpointDir = /root/flume/spooling/tmpagent.channels.spooling.dataDirs = /root/flume/spooling/dataagent.channels.tail.type = fileagent.channels.tail.checkpointDir = /root/flume/tail/tmpagent.channels.tail.dataDirs = /root/flume/tail/data  # Sourceagent.sources.spooling-source.type = spooldiragent.sources.spooling-source.spoolDir = /root/flume/inputagent.sources.spooling-source.channels = spoolingagent.sources.spooling-source.fileHeader = trueagent.sources.spooling-source.fileHeaderKey=fileagent.sources.spooling-source.basenameHeader=true  agent.sources.spooling-source.basenameHeaderKey=basename##日志发送完毕后,是否删除此源文件,  #“immediate”表示发送完毕后立即删除,可以节约磁盘空间  agent.sources.spooling-source.deletePolicy=neveragent.sources.tail-source.type=TAILDIR  agent.sources.tail-source.channels=tail  ##本人不想写flume的扩展代码,所以就为每个tail的文件指定一个group  agent.sources.tail-source.filegroups=www error  agent.sources.tail-source.filegroups.www=/data/logs/agent/www.log  agent.sources.tail-source.filegroups.error=/data/logs/agent/error.log  ##对于taildir,需要间歇性的保存tail文件的位置,以便中断后可以继续  ##json格式文件  agent.sources.tail-source.positionFile=/data/flume/.flume/ch-tail/taildir_position.json  ##每个tail的文件,创建一个kafka topic  agent.sources.tail-source.headers.www.topic=agent-www  agent.sources.tail-source.headers.error.topic=agent-error  agent.sources.tail-source.skipToEnd=true  agent.sources.tail-source.interceptors=i1 i2  agent.sources.tail-source.interceptors.i1.type=timestamp  agent.sources.tail-source.interceptors.i2.type=host  agent.sources.tail-source.interceptors.i2.useIP=true  agent.sources.tail-source.interceptors.i2.hostHeader=host  # Sinkagent.sinks.hdfs.channel = spoolingagent.sinks.hdfs.type = hdfsagent.sinks.hdfs.hdfs.path = hdfs://data-hadoop:9000/flumeagent.sinks.hdfs.hdfs.fileType = DataStreamagent.sinks.hdfs.hdfs.writeFormat = TEXTagent.sinks.hdfs.hdfs.rollInterval = 4# tail实时数据  agent.sinks.kafka.channel=ch-tail  agent.sinks.kafka.type=org.apache.flume.sink.kafka.KafkaSink  # kafka集群地址,可以为其子集  agent.sinks.kafka.kafka.bootstrap.servers=data-kafka:9092,data-kafka1:9092,data-kafka2:9092# 注意,topic中不支持参数化  # 但是为了提高扩展性,我们把topic信息通过header方式控制  #agent.sinks.kafka.kafka.topic=agent-%{filename}  # default 100,值越大,网络效率越高,但是延迟越高,准实时  agent.sinks.kafka.flumeBatchSize=32  agent.sinks.kafka.kafka.producer.acks=1  #use Avro-event format,will contain flume-headers  #default : false  agent.sinks.kafka.useFlumeEventFormat=false  
 |