一、业务场景 公司计划将ai产生的日志进行收集,用来给机器学习提供数据。 以下是实现方案
架构图:
使用程序直接将日志输出到kafka,然后通过logstash来进行过滤收集。
二、详细配置 1、kafka收集信息
topic: ai 分区数: 32、logstash input配置
#收集kafka配置 input{ kafka { bootstrap_servers => ["172.16.1.10:9092,172.16.1.11:9092,172.16.1.12:9092"] topics => ["ai"] #topic名称 codec => "plain" consumer_threads => 3 #分区数量 group_id => "logstash_kafka" #logstash组id client_id => "logstash_1" #client id decorate_events => false auto_offset_reset => "latest" } }配置多个logstash收集时,只需要修改client_id即可,配置如下:
#node1: input{ kafka { bootstrap_servers => ["172.16.1.10:9092,172.16.1.11:9092,172.16.1.12:9092"] topics => ["ai"] #topic名称 codec => "plain" consumer_threads => 2 #分区数量 group_id => "logstash_kafka" #logstash组id client_id => "logstash_1" #client id decorate_events => false auto_offset_reset => "latest" } } #node2 input{ kafka { bootstrap_servers => ["172.16.1.10:9092,172.16.1.11:9092,172.16.1.12:9092"] topics => ["ai"] #topic名称 codec => "plain" consumer_threads => 1 #分区数量 group_id => "logstash_kafka" #logstash组id client_id => "logstash_2" #client id decorate_events => false auto_offset_reset => "latest" } } #两个节点使用的分区数量相加之和不能大于3。3、注意事项 (1)创建kafka topic时分区数量尽量不小于3个,以便应对未来数据量增大需要扩展logstash的需要 (2)使用grok进行日志切割时最好全部写正则表达式进行,要不然数据量大时,将会导致数据延迟十分大,以下举例说明:
#最开始的grok语句 filter { grok { match => [ "message", "%{NOTSPACE:env} (?<time>[^ ]+ [^ ]+) (?<level>[^ ]+) (?<sessionId>[a-zA-Z]+\[[0-9]+\]) (?<body>(.*))" ] remove_field => ["message","@version","host"] } } #上线之后数据延迟时间达到了10小时,严重影响使用。优化后
filter { grok { match => [ "message", "(?<env>(.*)) (?<time>[^ ]+ [^ ]+) (?<level>[^ ]+) (?<sessionId>[a-zA-Z]+\[[0-9]+\]) (?<body>(.*))" ] remove_field => ["message","@version","host"] } } #上线之后数据收集变为实时。 #%{NOTSPACE:env} 这种格式严重消耗CPU,导致logstash的处理速率大大降低!