日志系统实践 - 配置日志流向(七)

[ 2017-09-03 ] [ 回到首页 ]

日志系统的数据流向演进了几个版本,最终确定了目前的方案~


设计要点

  • 数据链路尽可能短少(减少故障点)
  • 负载均衡,可水平扩展,具备一定的容灾能力
  • 尽量少修改收集层配置(减少 Flume 重启对系统造成的影响)

数据流图

数据流向


流向说明

业务层

  • Source:以 Taildir + Spooling Directory 为主(尽量不采用 Exec Source
  • Channel:数据量大 -> Memory Channel(速度优先)、数据量小 -> File Channel(安全优先)
  • Sink:采用负载均衡(sinkgroups load_balance)方式与收集层通信(Avro Sink

收集层

  • Source:采用 Avro 与业务层通信(Avro Source
  • Channel:以 Kafka Channel 为主(兼顾速度与安全)
  • Sink:HDFS Sink -> HDFS 集群、Kafka Sink -> Kafka 集群、Avro Sink -> 本地磁盘(其它机器)

定制组件

  • Sub 拦截器:按字符截断日志,防止超长日志写入日志系统引起异常回滚
  • File Sink:将日志分流分时写入相对可靠的本地磁盘(短时存储),当数据出现异常时从本地磁盘进行灾难恢复
  • Header 通道选择器:新增采集时只需在业务层拦截器上配置,收集层即可自动路由 event 到对应 Channel(收集层配置不变)

配置示例

业务层配置示例

agent.sources = [source_team_project_log]
agent.channels = [channel_to_avro]
agent.sinkgroups = [sink_avro_group]
agent.sinks = [sink_avro_s1] [sink_avro_s2]

#-------------------- SOURCE --------------------#
# [source_team_project_log]
agent.sources.[source_team_project_log].type = TAILDIR
agent.sources.[source_team_project_log].positionFile = /data/flume/source_team_project_log/position.json
# filegroups 只采集同一类型日志(方便监控日志数量)
agent.sources.[source_team_project_log].filegroups = [f1]
agent.sources.[source_team_project_log].filegroups.[f1] = /data/access.log
# 用于 Kafka Topic、HDFS、File Sink 路径
agent.sources.[source_team_project_log].headers.[f1].topic = team_project_log
# 流入收集层 [channel_to_file]、[channel_to_hdfs]、[channel_to_kafka] 三个 channel
agent.sources.[source_team_project_log].headers.[f1].channels = [channel_to_file] [channel_to_hdfs] [channel_to_kafka]
agent.sources.[source_team_project_log].interceptors = [i1]
# 用于 HDFS Sink、File Sink 路径中的时间部分
agent.sources.[source_team_project_log].interceptors.[i1].type = timestamp
agent.sources.[source_team_project_log].channels = [channel_to_avro]

#-------------------- CHANNEL --------------------#
# [channel_to_avro]
agent.channels.[channel_to_avro].type = file
agent.channels.[channel_to_avro].dataDirs = /data/flume/channel_to_avro/data
agent.channels.[channel_to_avro].checkpointDir = /data/flume/channel_to_avro/checkpoint
agent.channels.[channel_to_avro].capacity = 10000000
agent.channels.[channel_to_avro].byteCapacity = 5000000000
agent.channels.[channel_to_avro].transactionCapacity = 100000

#--------------------- SINK ----------------------#
# [sink_avro_group]
agent.sinkgroups.[sink_avro_group].sinks = [sink_avro_s1] [sink_avro_s2]
# 负载均衡(一定程度上也起到了 Failover 的作用)
agent.sinkgroups.[sink_avro_group].processor.type = load_balance
# 使用退避算法
agent.sinkgroups.[sink_avro_group].processor.backoff = true

agent.sinks.[sink_avro_s1].channel = [channel_to_avro]
agent.sinks.[sink_avro_s1].type = avro
agent.sinks.[sink_avro_s1].hostname = s1
agent.sinks.[sink_avro_s1].port = 4399
agent.sinks.[sink_avro_s1].batch-size = 10000
agent.sinks.[sink_avro_s1].compression-type = deflate

agent.sinks.[sink_avro_s2].channel = [channel_to_avro]
agent.sinks.[sink_avro_s2].type = avro
agent.sinks.[sink_avro_s2].hostname = s2
agent.sinks.[sink_avro_s2].port = 4399
agent.sinks.[sink_avro_s2].batch-size = 10000
agent.sinks.[sink_avro_s2].compression-type = deflate

收集层 collector 配置示例

collector.sources = [source_avro]
collector.channels = [channel_to_file] [channel_to_hdfs] [channel_to_kafka]
collector.sinks = [sink_avro] [sink_hdfs] [sink_kafka]

#-------------------- SOURCE --------------------#
# [source_avro]
collector.sources.[source_avro].type = avro
# 不用填具体 IP
collector.sources.[source_avro].bind = 0.0.0.0
collector.sources.[source_avro].port = 4399
collector.sources.[source_avro].threads = 10
collector.sources.[source_avro].compression-type = deflate
# 定制组件,根据 Header 中的 channels 值自动将 event 路由到对应 channel
collector.sources.[source_avro].selector.type = HeaderChannelSelector
collector.sources.[source_avro].channels = [channel_to_file] [channel_to_hdfs] [channel_to_kafka]

#-------------------- CHANNEL --------------------#
# [channel_to_file]
collector.channels.[channel_to_file].type = org.apache.flume.channel.kafka.KafkaChannel
collector.channels.[channel_to_file].kafka.bootstrap.servers = s3:9092,s4:9092,s5:9092
collector.channels.[channel_to_file].kafka.topic = flume_s1_channel_to_file
collector.channels.[channel_to_file].kafka.producer.compression.type = snappy
collector.channels.[channel_to_file].kafka.producer.batch.size = 50000
collector.channels.[channel_to_file].kafka.consumer.session.timeout.ms = 300000
collector.channels.[channel_to_file].kafka.consumer.request.timeout.ms = 360000

# [channel_to_hdfs]
collector.channels.[channel_to_hdfs].type = org.apache.flume.channel.kafka.KafkaChannel
collector.channels.[channel_to_hdfs].kafka.bootstrap.servers = s3:9092,s4:9092,s5:9092
collector.channels.[channel_to_hdfs].kafka.topic = flume_s1_channel_to_hdfs
collector.channels.[channel_to_hdfs].kafka.producer.compression.type = snappy
collector.channels.[channel_to_hdfs].kafka.producer.batch.size = 50000
collector.channels.[channel_to_hdfs].kafka.consumer.session.timeout.ms = 300000
collector.channels.[channel_to_hdfs].kafka.consumer.request.timeout.ms = 360000

# [channel_to_kafka]
collector.channels.[channel_to_kafka].type = org.apache.flume.channel.kafka.KafkaChannel
collector.channels.[channel_to_kafka].kafka.bootstrap.servers = s3:9092,s4:9092,s5:9092
collector.channels.[channel_to_kafka].kafka.topic = flume_s1_channel_to_kafka
collector.channels.[channel_to_kafka].kafka.producer.compression.type = snappy
collector.channels.[channel_to_kafka].kafka.producer.batch.size = 50000
collector.channels.[channel_to_kafka].kafka.consumer.session.timeout.ms = 300000
collector.channels.[channel_to_kafka].kafka.consumer.request.timeout.ms = 360000

#--------------------- SINK ----------------------#
# [sink_avro]
# 将数据传输到其它机器后再通过 File Sink 写磁盘
collector.sinks.[sink_avro].channel = [channel_to_file]
collector.sinks.[sink_avro].type = avro
collector.sinks.[sink_avro].hostname = s3
collector.sinks.[sink_avro].port = 4400
collector.sinks.[sink_avro].batch-size = 50000
collector.sinks.[sink_avro].compression-type = deflate

# [sink_hdfs]
collector.sinks.[sink_hdfs].channel = [channel_to_hdfs]
collector.sinks.[sink_hdfs].type = hdfs
# 保存路径:/logsys/<Header 中的 topic 值>/<%Y%m%d>
collector.sinks.[sink-hdfs].hdfs.path = hdfs://nns/logsys/%{topic}/%Y%m%d
collector.sinks.[sink-hdfs].hdfs.filePrefix = %H%M
# 若有多个 HDFS Sink 建议配置不同 fileSuffix
collector.sinks.[sink_hdfs].hdfs.fileSuffix = .collector1.log
collector.sinks.[sink_hdfs].hdfs.batchSize = 50000
collector.sinks.[sink_hdfs].hdfs.fileType = DataStream
collector.sinks.[sink_hdfs].hdfs.minBlockReplicas = 1
collector.sinks.[sink_hdfs].hdfs.callTimeout = 60000
collector.sinks.[sink_hdfs].hdfs.idleTimeout= 300
# 每 5 分钟滚动 1 个文件
collector.sinks.[sink_hdfs].hdfs.rollInterval = 300
# 不按大小滚动
collector.sinks.[sink_hdfs].hdfs.rollSize = 0
# 不按 event 数滚动
collector.sinks.[sink_hdfs].hdfs.rollCount = 0

# [sink_kafka]
# 根据 Header 中的 topic 值将 event 写入指定 topic
collector.sinks.[sink_kafka].channel = [channel_to_kafka]
collector.sinks.[sink_kafka].type = org.apache.flume.sink.kafka.KafkaSink
collector.sinks.[sink_kafka].brokerList = s3:9092,s4:9092,s5:9092
collector.sinks.[sink_kafka].flumeBatchSize = 50000
# 使用 snappy 压缩
collector.sinks.[sink-kafka].kafka.producer.compression.type = snappy

收集层 File Sink 配置示例

filesink.sources = [source_avro]
filesink.channels = [channel_to_file]
filesink.sinks = [sink_file]

#-------------------- SOURCE --------------------#
# [source_avro]
filesink.sources.[source_avro].type = avro
filesink.sources.[source_avro].bind = 0.0.0.0
filesink.sources.[source_avro].port = 4400
filesink.sources.[source_avro].threads = 10
filesink.sources.[source_avro].compression-type = deflate
filesink.sources.[source_avro].channels = [channel_to_file]

#-------------------- CHANNEL --------------------#
# [channel_to_file]
filesink.channels.[channel_to_file].type = memory
filesink.channels.[channel_to_file].capacity = 1000000
filesink.channels.[channel_to_file].byteCapacity = 500000000
filesink.channels.[channel_to_file].transactionCapacity = 100000

#--------------------- SINK ----------------------#
# [sink_file]
filesink.sinks.[sink_file].channel = [channel_to_file]
# 定制组件,分流分时写入磁盘
filesink.sinks.[sink_file].type = FileSink
filesink.sinks.[sink_file].batchSize = 50000
# 保存路径:/data/filesink/<Header 中的 topic 值>/<%Y%m%d>/<%H%i>.log
filesink.sinks.[sink_file].path = /data/filesink/