Syslog 顧名思義就是系統日誌或系統記錄,是一種透過 TCP/IP 來傳遞記錄訊息的標準,很多網路設備像 Router、Switch、Firewall…等都有支援,它的運作模式就是架設一個 syslog 伺服器等待接收記錄訊息,在 ELK 就有 syslog 插件可以用,只不過收到的記錄訊息格式都不太一樣,每台網路設備都會自定義自己的記錄格式,收到記錄之後要有 pretty printing 的話,我們就要仰賴 Logstash 了。
產生 Logstash 設定檔
我這邊要接收的網路設備的記錄訊息是 FortiGate 200B,但是 Google 到的 grok match pattern 都沒辦法完全符合我接收到的訊息,只好以找到的 pattern 為基礎再修改成符合我要的樣子。
input {
syslog {
port => 514
}
}
filter {
grok {
match => [
"message", "<%{NUMBER:syslogIndex}>date=%{YEAR:year}\-%{MONTHNUM:month}\-%{MONTHDAY:day} time=%{TIME:time} devname=%{HOSTNAME:devName} devid=%{HOSTNAME:devId} logid=%{NUMBER:logId} type=%{WORD:logType} subtype=%{WORD:logSubType} level=%{WORD:level} vd=%{DATA:virtualDomain} srcip=%{IP:srcIP} srcport=%{NUMBER:srcPort} srcintf=%{DATA:srcInterface} dstip=%{IP:dstIP} dstport=%{NUMBER:dstPort} dstintf=%{DATA:dstInterface} sessionid=%{INT:sessionId} proto=%{INT:protocol} action=%{WORD:action} policyid=%{INT:policyId} dstcountry=%{QS:dstCountry} srccountry=%{QS:srcCountry} trandisp=%{WORD:tranDisplay} service=%{DATA:service} app=%{QS:application} duration=%{NUMBER:duration} sentbyte=%{INT:sentByte} rcvdbyte=%{INT:receivedByte} sentpkt=%{INT:sentPacket} appcat=%{QS:applicationCategory}",
"message", "<%{NUMBER:syslogIndex}>date=%{YEAR:year}\-%{MONTHNUM:month}\-%{MONTHDAY:day} time=%{TIME:time} devname=%{HOSTNAME:devName} devid=%{HOSTNAME:devId} logid=%{NUMBER:logId} type=%{WORD:logType} subtype=%{WORD:logSubType} level=%{WORD:level} vd=%{DATA:virtualDomain} srcip=%{IP:srcIP} srcport=%{NUMBER:srcPort} srcintf=%{DATA:srcInterface} dstip=%{IP:dstIP} dstport=%{NUMBER:dstPort} dstintf=%{DATA:dstInterface} sessionid=%{INT:sessionId} proto=%{INT:protocol} action=%{WORD:action} policyid=%{INT:policyId} dstcountry=%{QS:dstCountry} srccountry=%{QS:srcCountry} trandisp=%{WORD:tranDisplay} service=%{DATA:service} app=%{QS:application} duration=%{NUMBER:duration} sentbyte=%{INT:sentByte} rcvdbyte=%{INT:receivedByte} sentpkt=%{INT:sentPacket} rcvdpkt=%{INT:receivedPacket} appcat=%{QS:applicationCategory}",
"message", "<%{NUMBER:syslogIndex}>date=%{YEAR:year}\-%{MONTHNUM:month}\-%{MONTHDAY:day} time=%{TIME:time} devname=%{HOSTNAME:devName} devid=%{HOSTNAME:devId} logid=%{NUMBER:logId} type=%{WORD:logType} subtype=%{WORD:logSubType} level=%{WORD:level} vd=%{DATA:virtualDomain} srcip=%{IP:srcIP} srcport=%{NUMBER:srcPort} srcintf=%{DATA:srcInterface} dstip=%{IP:dstIP} dstport=%{NUMBER:dstPort} dstintf=%{DATA:dstInterface} sessionid=%{INT:sessionId} proto=%{INT:protocol} action=%{WORD:action} policyid=%{INT:policyId} dstcountry=%{QS:dstCountry} srccountry=%{QS:srcCountry} trandisp=%{WORD:tranDisplay} service=%{DATA:service} app=%{QS:application} duration=%{NUMBER:duration} sentbyte=%{INT:sentByte} rcvdbyte=%{INT:receivedByte} sentpkt=%{INT:sentPacket} rcvdpkt=%{INT:receivedPacket} appcat=%{QS:applicationCategory} craction=%{INT:clientReputationAction} crlevel=%{WORD:clientReputationLevel}",
"message", "<%{NUMBER:syslogIndex}>date=%{YEAR:year}\-%{MONTHNUM:month}\-%{MONTHDAY:day} time=%{TIME:time} devname=%{HOSTNAME:devName} devid=%{HOSTNAME:devId} logid=%{NUMBER:logId} type=%{WORD:logType} subtype=%{WORD:logSubType} level=%{WORD:level} vd=%{DATA:virtualDomain} logdesc=%{QS:logDescription} action=%{QS:action} cpu=%{INT:cpu} mem=%{INT:memory} totalsession=%{INT:totalSession} disk=%{INT:disk} bandwidth=%{DATA:bandwidth} setuprate=%{INT:setupRate} disklograte=%{INT:diskLogRate} fazlograte=%{INT:fazLogRate} msg=%{QS:msg}"
]
}
mutate {
add_field => {
"logTime" => "%{year}-%{month}-%{day} %{time}"
}
gsub => [
"srcInterface", "\"", "",
"dstInterface", "\"", "",
"srcCountry", "\"", "",
"dstCountry", "\"", "",
"service", "\"", "",
"application", "\"", "",
"applicationCategory", "\"", "",
"logDescription", "\"", "",
"action", "\"", "",
"msg", "\"", ""
]
}
date {
match => ["logTime", "YYYY-MM-dd HH:mm:ss"]
timezone => "Etc/GMT-8"
remove_field => ["logTime", "year", "month", "day", "time", "msg"]
}
}
output {
elasticsearch {
hosts => ["elasticsearch1", "elasticsearch1", "elasticsearch1"]
document_type => "firewall"
}
}
那幾個 grok 的 match pattern 著實花了我不少時間,其實最花時間的也是在這兒而已,這邊有幾個要注意的設定。
mutate 插件
我用 add_field
方法新增了一個 logTime
欄位,它把我從訊息中解析出來的 %{year}
、%{month}
、%{day}
、%{time}
重新組合成日期時間格式的字串,好可以往下丟給 date 插件處理。
我接著用 gsub
方法把陣列中各欄位內的值的雙引號用空字串取代。
date 插件
這邊要另外指定 timezone
,我們在收訊息的時候要注意一下,如果記錄的時間是從訊息本身解析出來的,要指定對的時區,比如我們身處在台灣,timezone 就指定為 Etc/GMT-8
,相關的 timezone 參數值可以參考 Joda Available Time Zones。
出現 'syslog listener died' 訊息
當我背景啟動 Logstash 後,在 Kibana 上一直看不到訊息,查看 Logstash 記錄之後才發現 syslog listener 死掉了,參考了這篇文章 UDP listener died 到 /etc/init.d/logstash
檔案中,修改 LS_USER
參數值為 root
,Logstash 就啟動正常了。
原因是因為 syslog 要 bind 的 port 是 514,小於 1024,在 Linux 裡面不是 root 身份則不允許 bind 小於 1024 的 port,因此我們修改 Logstash 背景啟動的 script,指定 Logstash 的執行身份為 root,問題就會得到解決。
順帶一提的最佳化
在 /etc/init.d/logstash
Logstash 背景啟動的 script 檔案中,有兩個參數 LS_HEAP_SIZE
、LS_OPEN_FILES
,建議調整成下面的值。
LS_HEAP_SIZE="總記憶體的一半,最大不要超過32GB。"
LS_OPEN_FILES=65535
最後,請網路設備的管理人員幫忙調整網路設備的設定,forward 一份記錄訊息給 Logstash 就搞定了,接著就等待 Kibana 顯示出訊息。