flume.conf 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. # source, channel, sink definition
  2. agent.channels = spooling tail
  3. agent.sources = spooling-source tail-source
  4. agent.sinks = hdfs kafka
  5. # Channel
  6. # Define a memory channel
  7. #agent.channels.spooling.type = memory
  8. #agent.channels.tail.type = memory
  9. # Define a file channel
  10. agent.channels.spooling.type = file
  11. agent.channels.spooling.checkpointDir = /root/flume/spooling/tmp
  12. agent.channels.spooling.dataDirs = /root/flume/spooling/data
  13. agent.channels.tail.type = file
  14. agent.channels.tail.checkpointDir = /root/flume/tail/tmp
  15. agent.channels.tail.dataDirs = /root/flume/tail/data
  16. # Source
  17. agent.sources.spooling-source.type = spooldir
  18. agent.sources.spooling-source.spoolDir = /root/flume/input
  19. agent.sources.spooling-source.channels = spooling
  20. agent.sources.spooling-source.fileHeader = true
  21. agent.sources.spooling-source.fileHeaderKey=file
  22. agent.sources.spooling-source.basenameHeader=true
  23. agent.sources.spooling-source.basenameHeaderKey=basename
  24. ##日志发送完毕后,是否删除此源文件,
  25. #“immediate”表示发送完毕后立即删除,可以节约磁盘空间
  26. agent.sources.spooling-source.deletePolicy=never
  27. agent.sources.tail-source.type=TAILDIR
  28. agent.sources.tail-source.channels=tail
  29. ##本人不想写flume的扩展代码,所以就为每个tail的文件指定一个group
  30. agent.sources.tail-source.filegroups=www error
  31. agent.sources.tail-source.filegroups.www=/data/logs/agent/www.log
  32. agent.sources.tail-source.filegroups.error=/data/logs/agent/error.log
  33. ##对于taildir,需要间歇性的保存tail文件的位置,以便中断后可以继续
  34. ##json格式文件
  35. agent.sources.tail-source.positionFile=/data/flume/.flume/ch-tail/taildir_position.json
  36. ##每个tail的文件,创建一个kafka topic
  37. agent.sources.tail-source.headers.www.topic=agent-www
  38. agent.sources.tail-source.headers.error.topic=agent-error
  39. agent.sources.tail-source.skipToEnd=true
  40. agent.sources.tail-source.interceptors=i1 i2
  41. agent.sources.tail-source.interceptors.i1.type=timestamp
  42. agent.sources.tail-source.interceptors.i2.type=host
  43. agent.sources.tail-source.interceptors.i2.useIP=true
  44. agent.sources.tail-source.interceptors.i2.hostHeader=host
  45. # Sink
  46. agent.sinks.hdfs.channel = spooling
  47. agent.sinks.hdfs.type = hdfs
  48. agent.sinks.hdfs.hdfs.path = hdfs://data-hadoop:9000/flume
  49. agent.sinks.hdfs.hdfs.fileType = DataStream
  50. agent.sinks.hdfs.hdfs.writeFormat = TEXT
  51. agent.sinks.hdfs.hdfs.rollInterval = 4
  52. # tail实时数据
  53. agent.sinks.kafka.channel=ch-tail
  54. agent.sinks.kafka.type=org.apache.flume.sink.kafka.KafkaSink
  55. # kafka集群地址,可以为其子集
  56. agent.sinks.kafka.kafka.bootstrap.servers=data-kafka:9092,data-kafka1:9092,data-kafka2:9092
  57. # 注意,topic中不支持参数化
  58. # 但是为了提高扩展性,我们把topic信息通过header方式控制
  59. #agent.sinks.kafka.kafka.topic=agent-%{filename}
  60. # default 100,值越大,网络效率越高,但是延迟越高,准实时
  61. agent.sinks.kafka.flumeBatchSize=32
  62. agent.sinks.kafka.kafka.producer.acks=1
  63. #use Avro-event format,will contain flume-headers
  64. #default : false
  65. agent.sinks.kafka.useFlumeEventFormat=false