[料理佳餚] ELK 用 syslog 插件收集網路設備的 Log

Syslog 顧名思義就是系統日誌或系統記錄,是一種透過 TCP/IP 來傳遞記錄訊息的標準,很多網路設備像 Router、Switch、Firewall…等都有支援,它的運作模式就是架設一個 syslog 伺服器等待接收記錄訊息,在 ELK 就有 syslog 插件可以用,只不過收到的記錄訊息格式都不太一樣,每台網路設備都會自定義自己的記錄格式,收到記錄之後要有 pretty printing 的話,我們就要仰賴 Logstash 了。

產生 Logstash 設定檔

我這邊要接收的網路設備的記錄訊息是 FortiGate 200B,但是 Google 到的 grok match pattern 都沒辦法完全符合我接收到的訊息,只好以找到的 pattern 為基礎再修改成符合我要的樣子。

input {
    syslog {
        port => 514
    }
}
filter {
    grok {
        match => [
            "message", "<%{NUMBER:syslogIndex}>date=%{YEAR:year}\-%{MONTHNUM:month}\-%{MONTHDAY:day} time=%{TIME:time} devname=%{HOSTNAME:devName} devid=%{HOSTNAME:devId} logid=%{NUMBER:logId} type=%{WORD:logType} subtype=%{WORD:logSubType} level=%{WORD:level} vd=%{DATA:virtualDomain} srcip=%{IP:srcIP} srcport=%{NUMBER:srcPort} srcintf=%{DATA:srcInterface} dstip=%{IP:dstIP} dstport=%{NUMBER:dstPort} dstintf=%{DATA:dstInterface} sessionid=%{INT:sessionId} proto=%{INT:protocol} action=%{WORD:action} policyid=%{INT:policyId} dstcountry=%{QS:dstCountry} srccountry=%{QS:srcCountry} trandisp=%{WORD:tranDisplay} service=%{DATA:service} app=%{QS:application} duration=%{NUMBER:duration} sentbyte=%{INT:sentByte} rcvdbyte=%{INT:receivedByte} sentpkt=%{INT:sentPacket} appcat=%{QS:applicationCategory}",
            "message", "<%{NUMBER:syslogIndex}>date=%{YEAR:year}\-%{MONTHNUM:month}\-%{MONTHDAY:day} time=%{TIME:time} devname=%{HOSTNAME:devName} devid=%{HOSTNAME:devId} logid=%{NUMBER:logId} type=%{WORD:logType} subtype=%{WORD:logSubType} level=%{WORD:level} vd=%{DATA:virtualDomain} srcip=%{IP:srcIP} srcport=%{NUMBER:srcPort} srcintf=%{DATA:srcInterface} dstip=%{IP:dstIP} dstport=%{NUMBER:dstPort} dstintf=%{DATA:dstInterface} sessionid=%{INT:sessionId} proto=%{INT:protocol} action=%{WORD:action} policyid=%{INT:policyId} dstcountry=%{QS:dstCountry} srccountry=%{QS:srcCountry} trandisp=%{WORD:tranDisplay} service=%{DATA:service} app=%{QS:application} duration=%{NUMBER:duration} sentbyte=%{INT:sentByte} rcvdbyte=%{INT:receivedByte} sentpkt=%{INT:sentPacket} rcvdpkt=%{INT:receivedPacket} appcat=%{QS:applicationCategory}",
            "message", "<%{NUMBER:syslogIndex}>date=%{YEAR:year}\-%{MONTHNUM:month}\-%{MONTHDAY:day} time=%{TIME:time} devname=%{HOSTNAME:devName} devid=%{HOSTNAME:devId} logid=%{NUMBER:logId} type=%{WORD:logType} subtype=%{WORD:logSubType} level=%{WORD:level} vd=%{DATA:virtualDomain} srcip=%{IP:srcIP} srcport=%{NUMBER:srcPort} srcintf=%{DATA:srcInterface} dstip=%{IP:dstIP} dstport=%{NUMBER:dstPort} dstintf=%{DATA:dstInterface} sessionid=%{INT:sessionId} proto=%{INT:protocol} action=%{WORD:action} policyid=%{INT:policyId} dstcountry=%{QS:dstCountry} srccountry=%{QS:srcCountry} trandisp=%{WORD:tranDisplay} service=%{DATA:service} app=%{QS:application} duration=%{NUMBER:duration} sentbyte=%{INT:sentByte} rcvdbyte=%{INT:receivedByte} sentpkt=%{INT:sentPacket} rcvdpkt=%{INT:receivedPacket} appcat=%{QS:applicationCategory} craction=%{INT:clientReputationAction} crlevel=%{WORD:clientReputationLevel}",
            "message", "<%{NUMBER:syslogIndex}>date=%{YEAR:year}\-%{MONTHNUM:month}\-%{MONTHDAY:day} time=%{TIME:time} devname=%{HOSTNAME:devName} devid=%{HOSTNAME:devId} logid=%{NUMBER:logId} type=%{WORD:logType} subtype=%{WORD:logSubType} level=%{WORD:level} vd=%{DATA:virtualDomain} logdesc=%{QS:logDescription} action=%{QS:action} cpu=%{INT:cpu} mem=%{INT:memory} totalsession=%{INT:totalSession} disk=%{INT:disk} bandwidth=%{DATA:bandwidth} setuprate=%{INT:setupRate} disklograte=%{INT:diskLogRate} fazlograte=%{INT:fazLogRate} msg=%{QS:msg}"
        ]
    }
    mutate {
        add_field => {
            "logTime" => "%{year}-%{month}-%{day} %{time}"
        }
        gsub => [
            "srcInterface", "\"", "",
            "dstInterface", "\"", "",
            "srcCountry", "\"", "",
            "dstCountry", "\"", "",
            "service", "\"", "",
            "application", "\"", "",
            "applicationCategory", "\"", "",
            "logDescription", "\"", "",
            "action", "\"", "",
            "msg", "\"", ""
        ]
    }
    date {
        match => ["logTime", "YYYY-MM-dd HH:mm:ss"]
        timezone => "Etc/GMT-8"
        remove_field => ["logTime", "year", "month", "day", "time", "msg"]
    }
}
output {
    elasticsearch {
        hosts => ["elasticsearch1", "elasticsearch1", "elasticsearch1"]
        document_type => "firewall"
    }
}

那幾個 grok 的 match pattern 著實花了我不少時間,其實最花時間的也是在這兒而已,這邊有幾個要注意的設定。

mutate 插件

我用 add_field 方法新增了一個 logTime 欄位,它把我從訊息中解析出來的 %{year}%{month}%{day}%{time} 重新組合成日期時間格式的字串,好可以往下丟給 date 插件處理。

我接著用 gsub 方法把陣列中各欄位內的值的雙引號用空字串取代。

date 插件

這邊要另外指定 timezone,我們在收訊息的時候要注意一下,如果記錄的時間是從訊息本身解析出來的,要指定對的時區,比如我們身處在台灣,timezone 就指定為 Etc/GMT-8,相關的 timezone 參數值可以參考 Joda Available Time Zones

出現 'syslog listener died' 訊息

當我背景啟動 Logstash 後,在 Kibana 上一直看不到訊息,查看 Logstash 記錄之後才發現 syslog listener 死掉了,參考了這篇文章 UDP listener died/etc/init.d/logstash 檔案中,修改 LS_USER 參數值為 root,Logstash 就啟動正常了。

原因是因為 syslog 要 bind 的 port 是 514,小於 1024,在 Linux 裡面不是 root 身份則不允許 bind 小於 1024 的 port,因此我們修改 Logstash 背景啟動的 script,指定 Logstash 的執行身份為 root,問題就會得到解決。

順帶一提的最佳化

/etc/init.d/logstash Logstash 背景啟動的 script 檔案中,有兩個參數 LS_HEAP_SIZELS_OPEN_FILES,建議調整成下面的值。

LS_HEAP_SIZE="總記憶體的一半,最大不要超過32GB。"
LS_OPEN_FILES=65535

最後,請網路設備的管理人員幫忙調整網路設備的設定,forward 一份記錄訊息給 Logstash 就搞定了,接著就等待 Kibana 顯示出訊息。

參考資料

相關資源

C# 指南
ASP.NET 教學
ASP.NET MVC 指引
Azure SQL Database 教學
SQL Server 教學
Xamarin.Forms 教學