跳到主要内容

ELK Stack

架构

组件语言角色内存
FilebeatGo日志采集,轻量转发~50MB
LogstashJRuby日志解析、过滤、富化~1GB
ElasticsearchJava全文索引、存储、检索≥2GB
KibanaNode.js可视化、仪表盘~500MB

Filebeat 配置

filebeat.yml
filebeat.inputs:
# 采集 Nginx 日志
- type: log
paths:
- /var/log/nginx/access.log
fields:
service: nginx
env: production
# 多行日志合并(如 Java 堆栈)
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after

# 采集容器日志
- type: container
paths:
- /var/lib/docker/containers/*/*.log
processors:
- add_kubernetes_metadata: ~

# 输出到 Kafka(推荐生产环境)
output.kafka:
hosts: ["kafka-1:9092", "kafka-2:9092"]
topic: "logs-%{[fields.service]}"
partition.round_robin:
reachable_only: true

# 或直接输出到 ES(小规模)
# output.elasticsearch:
# hosts: ["es-1:9200"]
# index: "logs-%{[fields.service]}-%{+yyyy.MM.dd}"

Logstash 配置

logstash.conf
input {
kafka {
bootstrap_servers => "kafka-1:9092,kafka-2:9092"
topics => ["logs-nginx", "logs-app"]
group_id => "logstash-consumer"
codec => json
}
}

filter {
# Nginx access log 解析
if [fields][service] == "nginx" {
grok {
match => {
"message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:bytes}'
}
}
# GeoIP 地理位置
geoip { source => "client_ip" }
}

# JSON 日志直接解析
if [fields][service] == "app" {
json { source => "message" }
}

# 敏感信息脱敏
mutate {
gsub => [
"message", "\d{11}", "***PHONE***",
"message", "\d{16,19}", "***CARD***"
]
}

# 时间戳统一
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z", "ISO8601"]
target => "@timestamp"
}
}

output {
elasticsearch {
hosts => ["es-1:9200", "es-2:9200"]
index => "logs-%{[fields][service]}-%{+YYYY.MM.dd}"
# ILM 索引生命周期管理
ilm_enabled => true
ilm_rollover_alias => "logs"
ilm_policy => "logs-policy"
}
}

Elasticsearch 索引管理

ILM 索引生命周期策略

PUT _ilm/policy/logs-policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "1d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 }
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}

常用查询

# KQL(Kibana 查询)
service: "order-service" AND level: "ERROR" AND message: "timeout"

# 查看索引状态
GET _cat/indices/logs-*?v&s=store.size:desc

# 查看分片分配
GET _cat/shards/logs-*?v

ES 性能优化

优化项说明
分片数单分片 20-50GB,避免过多小分片
副本数写入密集时可暂设为 0,写完恢复
Refreshrefresh_interval: 30s(默认 1s)
Bulk 写入批量写入,单次 5-15MB
Mappingkeyword vs text 区分,禁用不需要的字段索引
冷热分离热节点 SSD + 冷节点 HDD

常见面试问题

Q1: ELK 如何处理高吞吐量日志?

答案

  1. Kafka 缓冲:Filebeat → Kafka → Logstash,Kafka 承担峰值缓冲
  2. 多实例 Logstash:水平扩展解析能力
  3. ES 批量写入:Bulk API,调大 refresh_interval
  4. 索引分片策略:按时间滚动(每天/每小时),热温冷分层
  5. 采集端过滤:Filebeat 层面丢弃 DEBUG 日志

Q2: Logstash 和 Filebeat 的区别?

答案

FilebeatLogstash
语言GoJRuby
内存~50MB~1GB
功能采集 + 简单过滤复杂解析/过滤/富化
定位每台机器部署一个中心化解析节点

典型部署:Filebeat(每台机器)→ Kafka → Logstash(集中处理)→ ES。

相关链接