| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 | 
							- # source, channel, sink definition  
 
- agent.channels = spooling tail
 
- agent.sources = spooling-source tail-source
 
- agent.sinks = hdfs kafka
 
-   
 
- # Channel  
 
- # Define a memory channel
 
- #agent.channels.spooling.type = memory
 
- #agent.channels.tail.type = memory
 
- # Define a file channel
 
- agent.channels.spooling.type = file
 
- agent.channels.spooling.checkpointDir = /root/flume/spooling/tmp
 
- agent.channels.spooling.dataDirs = /root/flume/spooling/data
 
- agent.channels.tail.type = file
 
- agent.channels.tail.checkpointDir = /root/flume/tail/tmp
 
- agent.channels.tail.dataDirs = /root/flume/tail/data
 
-   
 
- # Source
 
- agent.sources.spooling-source.type = spooldir
 
- agent.sources.spooling-source.spoolDir = /root/flume/input
 
- agent.sources.spooling-source.channels = spooling
 
- agent.sources.spooling-source.fileHeader = true
 
- agent.sources.spooling-source.fileHeaderKey=file
 
- agent.sources.spooling-source.basenameHeader=true  
 
- agent.sources.spooling-source.basenameHeaderKey=basename
 
- ##日志发送完毕后,是否删除此源文件,  
 
- #“immediate”表示发送完毕后立即删除,可以节约磁盘空间  
 
- agent.sources.spooling-source.deletePolicy=never
 
- agent.sources.tail-source.type=TAILDIR  
 
- agent.sources.tail-source.channels=tail  
 
- ##本人不想写flume的扩展代码,所以就为每个tail的文件指定一个group  
 
- agent.sources.tail-source.filegroups=www error  
 
- agent.sources.tail-source.filegroups.www=/data/logs/agent/www.log  
 
- agent.sources.tail-source.filegroups.error=/data/logs/agent/error.log  
 
- ##对于taildir,需要间歇性的保存tail文件的位置,以便中断后可以继续  
 
- ##json格式文件  
 
- agent.sources.tail-source.positionFile=/data/flume/.flume/ch-tail/taildir_position.json  
 
- ##每个tail的文件,创建一个kafka topic  
 
- agent.sources.tail-source.headers.www.topic=agent-www  
 
- agent.sources.tail-source.headers.error.topic=agent-error  
 
- agent.sources.tail-source.skipToEnd=true  
 
- agent.sources.tail-source.interceptors=i1 i2  
 
- agent.sources.tail-source.interceptors.i1.type=timestamp  
 
- agent.sources.tail-source.interceptors.i2.type=host  
 
- agent.sources.tail-source.interceptors.i2.useIP=true  
 
- agent.sources.tail-source.interceptors.i2.hostHeader=host  
 
- # Sink
 
- agent.sinks.hdfs.channel = spooling
 
- agent.sinks.hdfs.type = hdfs
 
- agent.sinks.hdfs.hdfs.path = hdfs://data-hadoop:9000/flume
 
- agent.sinks.hdfs.hdfs.fileType = DataStream
 
- agent.sinks.hdfs.hdfs.writeFormat = TEXT
 
- agent.sinks.hdfs.hdfs.rollInterval = 4
 
- # tail实时数据  
 
- agent.sinks.kafka.channel=ch-tail  
 
- agent.sinks.kafka.type=org.apache.flume.sink.kafka.KafkaSink  
 
- # kafka集群地址,可以为其子集  
 
- agent.sinks.kafka.kafka.bootstrap.servers=data-kafka:9092,data-kafka1:9092,data-kafka2:9092
 
- # 注意,topic中不支持参数化  
 
- # 但是为了提高扩展性,我们把topic信息通过header方式控制  
 
- #agent.sinks.kafka.kafka.topic=agent-%{filename}  
 
- # default 100,值越大,网络效率越高,但是延迟越高,准实时  
 
- agent.sinks.kafka.flumeBatchSize=32  
 
- agent.sinks.kafka.kafka.producer.acks=1  
 
- #use Avro-event format,will contain flume-headers  
 
- #default : false  
 
- agent.sinks.kafka.useFlumeEventFormat=false  
 
 
  |