# source, channel, sink definition agent.channels = spooling tail agent.sources = spooling-source tail-source agent.sinks = hdfs kafka # Channel # Define a memory channel #agent.channels.spooling.type = memory #agent.channels.tail.type = memory # Define a file channel agent.channels.spooling.type = file agent.channels.spooling.checkpointDir = /root/flume/spooling/tmp agent.channels.spooling.dataDirs = /root/flume/spooling/data agent.channels.tail.type = file agent.channels.tail.checkpointDir = /root/flume/tail/tmp agent.channels.tail.dataDirs = /root/flume/tail/data # Source agent.sources.spooling-source.type = spooldir agent.sources.spooling-source.spoolDir = /root/flume/input agent.sources.spooling-source.channels = spooling agent.sources.spooling-source.fileHeader = true agent.sources.spooling-source.fileHeaderKey=file agent.sources.spooling-source.basenameHeader=true agent.sources.spooling-source.basenameHeaderKey=basename ##日志发送完毕后,是否删除此源文件, #“immediate”表示发送完毕后立即删除,可以节约磁盘空间 agent.sources.spooling-source.deletePolicy=never agent.sources.tail-source.type=TAILDIR agent.sources.tail-source.channels=tail ##本人不想写flume的扩展代码,所以就为每个tail的文件指定一个group agent.sources.tail-source.filegroups=www error agent.sources.tail-source.filegroups.www=/data/logs/agent/www.log agent.sources.tail-source.filegroups.error=/data/logs/agent/error.log ##对于taildir,需要间歇性的保存tail文件的位置,以便中断后可以继续 ##json格式文件 agent.sources.tail-source.positionFile=/data/flume/.flume/ch-tail/taildir_position.json ##每个tail的文件,创建一个kafka topic agent.sources.tail-source.headers.www.topic=agent-www agent.sources.tail-source.headers.error.topic=agent-error agent.sources.tail-source.skipToEnd=true agent.sources.tail-source.interceptors=i1 i2 agent.sources.tail-source.interceptors.i1.type=timestamp agent.sources.tail-source.interceptors.i2.type=host agent.sources.tail-source.interceptors.i2.useIP=true agent.sources.tail-source.interceptors.i2.hostHeader=host # Sink agent.sinks.hdfs.channel = spooling agent.sinks.hdfs.type = hdfs agent.sinks.hdfs.hdfs.path = hdfs://data-hadoop:9000/flume agent.sinks.hdfs.hdfs.fileType = DataStream agent.sinks.hdfs.hdfs.writeFormat = TEXT agent.sinks.hdfs.hdfs.rollInterval = 4 # tail实时数据 agent.sinks.kafka.channel=ch-tail agent.sinks.kafka.type=org.apache.flume.sink.kafka.KafkaSink # kafka集群地址,可以为其子集 agent.sinks.kafka.kafka.bootstrap.servers=data-kafka:9092,data-kafka1:9092,data-kafka2:9092 # 注意,topic中不支持参数化 # 但是为了提高扩展性,我们把topic信息通过header方式控制 #agent.sinks.kafka.kafka.topic=agent-%{filename} # default 100,值越大,网络效率越高,但是延迟越高,准实时 agent.sinks.kafka.flumeBatchSize=32 agent.sinks.kafka.kafka.producer.acks=1 #use Avro-event format,will contain flume-headers #default : false agent.sinks.kafka.useFlumeEventFormat=false