Suricataで不審なパケットを検知したら、メールなりで通知してほしいなぁって思いました。
しかし、Suricataそのものには通知機能はないっぽい?ので、サードパーティ製のアプリを活用するか自前で実装する必要があるっぽいです。
ということなので、アラート発報時にGmail / LINEにて通知してくれる通知アプリを今回作りました。
$ uname -aLinux rasp4-1 6.1.21-v8+ #1642 SMP PREEMPT Mon Apr 3 17:24:16 BST 2023 aarch64 GNU/Linux
今回、通知先にGmailとLINEを採用します。そこで前準備をします。
アプリパスワードを作成します。
2段階認証プロセスに対応していないアプリでも、アプリパスワードを使用することでGoogleアカウントにログインできるようになります。
なお、Gmail以外のメールサービスや自前のメールサーバを使いたい場合はスキップしてください。
LINE Notifyのアクセストークンを作成します。
LINE Notifyとは、LINEが提供する通知APIで無料で利用可能です。
なお、LINEを使ってない方やLINEへの通知が不要な場合はスキップしてください。
さて、前準備も済んだので実装を.....しました。
コードはGitHubにあげてますのでお好きにご活用ください。
GitHub - https://github.com/himazin331/suricata-alert-notice/tree/no_gpio
ここにもコードベタ貼りしておきます。
コード中にコメント散りばめてるので何となく分かると思いますが、ちょくちょく説明を挟んでおきます。
メール送信処理です。
なんかよしなにメールを送信してくれます。
send_mail.py1import smtplib2from email.mime.text import MIMEText3from email.mime.multipart import MIMEMultipart45from config.notice import *67# メール送信8class EMailSend():9# メール送信10def sendEmail(self, message: str):11msg: MIMEMultipart = MIMEMultipart()12msg["From"] = SENDER_EMAIL13msg["To"] = RECEIVER_EMAIL14msg["Subject"] = "[SuricataAlertNotice] 不審なパケットを検知しました!"15msg.attach(MIMEText(message, "plain"))1617# GmailのSMTPサーバーへの接続とメール送信18try:19with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:20server.starttls() # TLS暗号化の開始21server.login(SMTP_USER, SMTP_PASS) # SMTPサーバーへのログイン2223server.send_message(msg)24except Exception as e:25print(e)
LINE Notifyを通じてメッセージを送信する処理です。
いつものようにAuthorizationヘッダにトークン乗っけて、メッセージと一緒にPOSTするだけです。
send_line.py1import requests23from config.notice import LINE_NOTIFY_TOKEN45# Notify送信6class LineNotifySend():7# Line Notify API セットアップ8def __init__(self):9self.url: str = "https://notify-api.line.me/api/notify"10self.line_headers: dict[str, str] = {'Authorization': 'Bearer ' + LINE_NOTIFY_TOKEN}1112# メッセージ送信13def sendLineMessage(self, message: str):14try:15payload: dict[str, str] = {'message': message}16# Line送信17requests.post(self.url, headers=self.line_headers, params=payload)18except requests.exceptions.RequestException as e:19print(e)
main.pyです。
やってることは通知なのですが、細かく説明すると、
と、こんな感じです。
main.py1import json2import os3import time4from datetime import datetime, timezone, timedelta56from send_mail import EMailSend7from send_line import LineNotifySend89from config.general import *1011class Notice():12def __init__(self):13self.email_notice: EMailSend = EMailSend()14self.line_notify: LineNotifySend = LineNotifySend()1516self.notice_target_eve: list[dict] = []1718# 通知19def notice(self, notice_target_eve: list[dict]):20self.notice_target_eve = notice_target_eve2122message: str = "不審な通信を検知しました!\n"23message += f"件数: {len(self.notice_target_eve)} 件\n"24if not (NOTICE_TYPE is NoticeType.Nothing):25if NOTICE_TYPE is NoticeType.LineNotify or NOTICE_TYPE is NoticeType.Both: # LINE通知26self.send_line(message)27if NOTICE_TYPE is NoticeType.Email or NOTICE_TYPE is NoticeType.Both: # Email送信28self.send_email(message)2930# メッセージ作成31def create_message(self, eve: dict) -> str:32try:33message: str = "\n** 情報 **\n"34message += f"timestamp: {eve['timestamp'].strftime('%Y/%m/%d %H:%M:%S')}\n"35message += f"src: {eve['src_ip']}:{eve['src_port']} -> dest: {eve['dest_ip']}:{eve['dest_port']}\n"36message += f"protocol: {eve['proto']}\n"37message += f"\n{eve['alert']['signature']}\n"38message += f"signature ID: {eve['alert']['signature_id']}\n"39if "http" in eve:40eve_http: dict = eve["http"]41if eve_http != {}:42message += f"\n** HTTP情報 **\n"43message += f"hostname: {eve_http['hostname']}\n"44message += f"url: {eve_http['url']}\n"45message += f"{eve_http['http_content_type']} {eve_http['http_method']} {eve_http['status']}\n"46except KeyError:47pass48return message4950# Email送信51def send_email(self, message: str):52for eve in self.notice_target_eve:53message += self.create_message(eve)54message += "\n===========================================================\n"55self.email_notice.sendEmail(message)5657# LINE送信58def send_line(self, message: str):59self.line_notify.sendLineMessage(message)60for eve in self.notice_target_eve:61message = self.create_message(eve)62self.line_notify.sendLineMessage(message)6364def main():65notice: Notice = Notice()6667prev_timestamp: datetime = datetime.now(timezone(timedelta(hours=9)))68last_modified: float = 0.069current_modified: float = os.path.getmtime(EVE_JSONL_PATH)70lower_sig_id, upper_sig_id = SIGNATURE_ID_RANGE7172while True:73notice_target_eve: list[dict] = []7475if current_modified != last_modified:76# eve.json取得77with open(EVE_JSONL_PATH, "r") as eve_jsonl:78for jsonl in eve_jsonl:79eve: dict = json.loads(jsonl)8081cur_timestamp: datetime = datetime.strptime(eve["timestamp"], "%Y-%m-%dT%H:%M:%S.%f%z")82eve["timestamp"] = cur_timestamp83if cur_timestamp > prev_timestamp and "alert" in eve:84sig_id: int = eve["alert"]["signature_id"]85if lower_sig_id <= sig_id and sig_id <= upper_sig_id:86notice_target_eve.append(eve)87prev_timestamp = cur_timestamp88# 通知89if len(notice_target_eve) > 0:90notice.notice(notice_target_eve)91last_modified = current_modified92current_modified = os.path.getmtime(EVE_JSONL_PATH)93time.sleep(1)9495if __name__ == '__main__':96main()
今回、私は特にHTTP通信での脅威に目を光らせてる(笑)ので、通知メッセージにはIPやプロコトルなどのパケット情報に加えて、ホスト名やURL、HTTPメソッドなどのHTTP情報も含めるようにしてます。
HTTP以外のプロコトル情報も一緒に通知したいというのであれば、eve.jsonに乗る情報であれば自由に通知できるかと思いますので、色々カスタマイズしてみてください。
コード中に含まれる各定数は次の項目で説明します。
通知設定と認証情報とで別のファイルで分けてます。
config/general.py1from enum import Enum23class NoticeType(Enum):4Nothing = 05Email = 16LineNotify = 27Both = 389EVE_JSONL_PATH: str = "/usr/local/var/log/suricata/eve.json"10NOTICE_TYPE = NoticeType.Email11# SIGNATURE_ID_RANGE内のシグネチャのみ通知12SIGNATURE_ID_RANGE: tuple[int, int] = (2000000, 2099999) # ET Open Rulesets
NoticeTypeの各設定値はこんな意味合いです。
設定値 | 説明 | 備考 |
---|---|---|
EVE_JSONL_PATH | eve.jsonのパス | |
NOTICE_TYPE | 通知タイプ | NoticeType から指定 |
SIGNATURE_ID_RANGE | 通知対象とするシグネチャIDの範囲 | (始まり, 終わり) で指定 |
今回は、通知対象とするシグネチャIDの範囲として2000000 ~ 2099999をデフォルトに指定しました。
この範囲のシグネチャIDはET Open Rulesetsで使用しています。つまり、ET Open Rulesetsに合致したパケットに絞って通知をしてもらいます。
config/notice.py1SENDER_EMAIL: str = "your_email@example.com"2RECEIVER_EMAIL: str = "your_email@gmail.com"3SMTP_SERVER: str = "smtp.gmail.com"4SMTP_PORT: int = 5875SMTP_USER: str = "your_email@gmail.com"6SMTP_PASS: str = "**********************"78LINE_NOTIFY_TOKEN: str = "***************************" # Line Notify API Access Token
定数の説明は以下のとおりです。
定数 | 説明 | 備考 |
---|---|---|
SENDER_EMAIL | 送信者メールアドレス | |
RECEIVER_EMAIL | 受信者メールアドレス | |
SMTP_SERVER | SMTPサーバアドレス | Gmailはsmtp.gmail.com |
SMTP_PORT | SMTPポート | Gmailは587 |
SMTP_USER | 認証メールアドレス | Googleアカウント メールアドレス |
SMTP_PASS | 認証パスワード | Googleアカウント アプリパスワード |
LINE_NOTIFY_TOKEN | LINE Notifyアクセストークン |
さて、実際に動かしてみましょう。
しかし、ET Open RulesetsのシグネチャID範囲を通知対象としましたが、ここでは再現を容易にするためにデモ用のカスタムルールを通知対象とします。
通知対象とするデモ用カスタムルール↓
alert ssh any any -> 192.168.120.20 any (msg:"[Demo-SSH] SSH connection to Demo-VM"; ssh.software; content:"SSH"; nocase; sid:100001; rev:1;)alert http any any -> any any (msg:"[Demo-HTTP-1] Access to himazin331.com"; http.host; content:"himazin331.com"; sid:100002; rev:1;)alert http any any -> any any (msg:"[Demo-HTTP-2] Access to example.com"; http.host; content:"example.com"; sid:100003; rev:1;)
通知設定はこんな感じ↓
config/notice.py1from enum import Enum23class NoticeType(Enum):4Nothing = 05Email = 16LineNotify = 27Both = 389EVE_JSONL_PATH: str = "/usr/local/var/log/suricata/eve.json"10NOTICE_TYPE = NoticeType.Both1112# SIGNATURE_ID_RANGE内のシグネチャのみ通知13SIGNATURE_ID_RANGE: tuple[int, int] = (100001, 100003) # ET Open Rulesets
[Demo-HTTP-1] Access to himazin331.com
のアラート通知[Demo-HTTP-2] Access to example.com
のアラート通知[Demo-SSH] SSH connection to Demo-VM
のアラート通知こんな感じで、メールとLINEで通知することができました!
実際の運用を考えると、ET Open Rulesetsのすべてを通知するのはちょっと煩わしいかなと思います。
そもそもETの中にも"ET INFO"や"ET GAMES"といったさほど重要でないカテゴリもあるので、環境に応じて不要なカテゴリは除外しても良いのかなと思います。
ということで、ET Open Rulesetsのなかで検知の可能性が高いカテゴリのみにフィルタリングして通知するように、以下のようにちょっとだけ変更を加えました。
config/genera.py1FILTER_SIG_CATEGORY: list[str] = [2"Attack Response", "DNS", "DOS", "Exploit", "FTP",3"ICMP", "IMAP", "Malware", "NETBIOS", "Phishing",4"POP3", "RPC", "Shellcode", "SMTP", "SNMP", "SQL",5"TELNET", "TFTP", "Web Client", "Web Server", "Web Specific Apps", "WORM"6] # "ET xxx"のxxx。
main.py1def is_priority_sig(self, sig: str) -> bool:2for category in PRIORITY_SIG_CATEGORY:3if category in sig:4return True5return False67~ 略 ~89if lower_sig_id <= sig_id and sig_id <= upper_sig_id:10if is_priority_sig(eve["alert"]["signature"]): # <-- 追加11notice_target_eve.append(eve)
FILTER_SIG_CATEGORY内のカテゴリ説明だけ書いておきます。
カテゴリ | 説明 | カテゴリ | 説明 |
---|---|---|---|
Attack Response | システムへの侵入活動 | RPC | RPCに関する攻撃や脆弱性 |
DNS | DNSに関する攻撃や脆弱性 | Shellcode | 遠隔からのシェルコード実行 |
DOS | DoS攻撃の試み | SMTP | SMTPに関する攻撃や脆弱性 |
Exploit | 特定のサービスに分類されない普遍的な攻撃 | SNMP | SNMPに関する攻撃や脆弱性 |
FTP | FTPに関する攻撃や脆弱性 | SQL | SQLに関する攻撃や脆弱性 |
ICMP | ICMPに関する攻撃や脆弱性 | TELNET | Telnetに関する攻撃や脆弱性 |
IMAP | IMAPに関する攻撃や脆弱性 | TFTP | TFTPに関する攻撃や脆弱性 |
Malware | マルウェア攻撃 | Web Client | Webブラウザやcurl、wgetなどに関する攻撃や脆弱性 |
NETBIOS | NetBIOSに関する攻撃や脆弱性 | Web Server | Webサーバに関する攻撃や脆弱性 |
Phishing | フィッシング活動 | Web Specific Apps | 特定のWebアプリケーションに関する攻撃や脆弱性 |
POP3 | POP3に関する攻撃や脆弱性 | WORM | ワーム攻撃 |
詳細はET Category Descriptions (PDF)を参照してください。
いかがでしたか?通知アプリ開発編ということで自前で通知システムを実装してみただけですが...
実際の運用ではバックグラウンドで実行させたり、systemdでサービス起動させたりすると良いと思います。
また、実装上eve.jsonのファイルサイズが大きいと処理に時間がかかるかと思いますので、Suricataのほうで適切なログローテーション設定を行うことを強くおすすめします。
あと、これはそういうものなのかわからないのですが、GmailのSMTPサーバを経由してのメール送信が非常に遅く、アラート発報から約3分後にメール受信するというようなことになってます....私のネット環境の問題なのかSMTPサーバ側の問題なのかわからないのですが、もし改善策がわかる方いらっしゃいましたらTwitter(X)のDMやメールで教えてください!
ちなみに通知アプリのおまけ編も出そうと思っていて、少しずつですが作業を進めています。あまり新規性はないのですが...
ということで終わります。お疲れ様でした。