Suricata with Raspberry Piで遊ぼう!【通知アプリ開発編】

概要

Suricataで不審なパケットを検知したら、メールなりで通知してほしいなぁって思いました。

しかし、Suricataそのものには通知機能はないっぽい?ので、サードパーティ製のアプリを活用するか自前で実装する必要があるっぽいです。

ということなので、アラート発報時にGmail / LINEにて通知してくれる通知アプリを今回作りました。

前提

環境

  • Raspberry Pi 4 Model B [RAM 4GB, Raspberry Pi OS LIte (64-bit)]
    $ uname -a
    Linux rasp4-1 6.1.21-v8+ #1642 SMP PREEMPT Mon Apr 3 17:24:16 BST 2023 aarch64 GNU/Linux
  • Suricata 6.0.13
  • Python 3.9.2

前準備

今回、通知先にGmailとLINEを採用します。そこで前準備をします。

Gmailアプリパスワードの作成

アプリパスワードを作成します。

2段階認証プロセスに対応していないアプリでも、アプリパスワードを使用することでGoogleアカウントにログインできるようになります。

なお、Gmail以外のメールサービスや自前のメールサーバを使いたい場合はスキップしてください。

  1. 以下のリンクより2段階認証プロセスを有効にします。(案内に従って有効にしてください)
    Googleアカウント セキュリティ - https://myaccount.google.com/security
  2. 2段階認証プロセスを有効にしたら、以下のリンクよりアプリパスワードを作成します。
    Googleアカウント アプリパスワード - https://myaccount.google.com/apppasswords

    アプリを選択 > その他 > (任意の名前) で、"生成"をクリックします。
    アプリパスワード作成画面
  3. 生成されたアプリパスワードが表示されます。
    これは後ほど使うので控えておきます。
    生成されたアプリパスワード
    完了ボタンをクリックすると前の画面に戻り、作成したアプリ名と作成日が確認できます。
    アプリパスワード一覧

LINE Notifyアクセストークンの作成

LINE Notifyのアクセストークンを作成します。

LINE Notifyとは、LINEが提供する通知APIで無料で利用可能です。
なお、LINEを使ってない方やLINEへの通知が不要な場合はスキップしてください。

  1. 以下のリンク先にアクセスし、LINEにログインします。
    LINE Notify - https://notify-bot.line.me/ja/

    画面上部右にある"ログイン"からログインします。
    LINE Notifyトップ画面
  2. ログイン後に下のようなステップを踏んでください。
    トークン発行
  3. トークン名と送信先のトークルームを選択して、"発行する"をクリックします。
    今回はLINE Notifyと1対1で通知を受け取ります。

    Suricataアラート通知受信用にグループを作っておいて、それを選択してもいいかもですね。
    トークン発行画面
  4. 発行されたトークンを"コピー"でコピーし控えておきます。
    (もちろん下のトークンはダミーです)
    発行されたトークン
    閉じると、連携済みのサービスということで一覧に表示されます。
    連携中サービス一覧

実装

さて、前準備も済んだので実装を.....しました。

コードはGitHubにあげてますのでお好きにご活用ください。
GitHub - https://github.com/himazin331/suricata-alert-notice/tree/no_gpio

ここにもコードベタ貼りしておきます。
コード中にコメント散りばめてるので何となく分かると思いますが、ちょくちょく説明を挟んでおきます。

メール送信

メール送信処理です。

なんかよしなにメールを送信してくれます。

send_mail.py
1
import smtplib
2
from email.mime.text import MIMEText
3
from email.mime.multipart import MIMEMultipart
4
5
from config.notice import *
6
7
# メール送信
8
class EMailSend():
9
# メール送信
10
def sendEmail(self, message: str):
11
msg: MIMEMultipart = MIMEMultipart()
12
msg["From"] = SENDER_EMAIL
13
msg["To"] = RECEIVER_EMAIL
14
msg["Subject"] = "[SuricataAlertNotice] 不審なパケットを検知しました!"
15
msg.attach(MIMEText(message, "plain"))
16
17
# GmailのSMTPサーバーへの接続とメール送信
18
try:
19
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
20
server.starttls() # TLS暗号化の開始
21
server.login(SMTP_USER, SMTP_PASS) # SMTPサーバーへのログイン
22
23
server.send_message(msg)
24
except Exception as e:
25
print(e)

LINE送信

LINE Notifyを通じてメッセージを送信する処理です。

いつものようにAuthorizationヘッダにトークン乗っけて、メッセージと一緒にPOSTするだけです。

send_line.py
1
import requests
2
3
from config.notice import LINE_NOTIFY_TOKEN
4
5
# Notify送信
6
class LineNotifySend():
7
# Line Notify API セットアップ
8
def __init__(self):
9
self.url: str = "https://notify-api.line.me/api/notify"
10
self.line_headers: dict[str, str] = {'Authorization': 'Bearer ' + LINE_NOTIFY_TOKEN}
11
12
# メッセージ送信
13
def sendLineMessage(self, message: str):
14
try:
15
payload: dict[str, str] = {'message': message}
16
# Line送信
17
requests.post(self.url, headers=self.line_headers, params=payload)
18
except requests.exceptions.RequestException as e:
19
print(e)

メイン処理

main.pyです。

やってることは通知なのですが、細かく説明すると、

  1. eve.jsonに更新があるかをチェック。(ファイル更新日時を監視する)
  2. 更新があったら、eve.jsonの読み取りとparse。
  3. 新規のレコードかつアラートであることを示すalertを含むかをみる。
  4. 通知対象のシグネチャIDであるかをみる。通知対象であれば一旦リストにappend。
  5. 2と3をeve.jsonの最終行まで繰り返す。
  6. リスト内のレコードからパケット情報を取り出してメッセージを作成。
  7. メッセージをメールまたはLINE送信処理へ渡す。
  8. 1~7を1秒間隔でループ。

と、こんな感じです。

main.py
1
import json
2
import os
3
import time
4
from datetime import datetime, timezone, timedelta
5
6
from send_mail import EMailSend
7
from send_line import LineNotifySend
8
9
from config.general import *
10
11
class Notice():
12
def __init__(self):
13
self.email_notice: EMailSend = EMailSend()
14
self.line_notify: LineNotifySend = LineNotifySend()
15
16
self.notice_target_eve: list[dict] = []
17
18
# 通知
19
def notice(self, notice_target_eve: list[dict]):
20
self.notice_target_eve = notice_target_eve
21
22
message: str = "不審な通信を検知しました!\n"
23
message += f"件数: {len(self.notice_target_eve)} 件\n"
24
if not (NOTICE_TYPE is NoticeType.Nothing):
25
if NOTICE_TYPE is NoticeType.LineNotify or NOTICE_TYPE is NoticeType.Both: # LINE通知
26
self.send_line(message)
27
if NOTICE_TYPE is NoticeType.Email or NOTICE_TYPE is NoticeType.Both: # Email送信
28
self.send_email(message)
29
30
# メッセージ作成
31
def create_message(self, eve: dict) -> str:
32
try:
33
message: str = "\n** 情報 **\n"
34
message += f"timestamp: {eve['timestamp'].strftime('%Y/%m/%d %H:%M:%S')}\n"
35
message += f"src: {eve['src_ip']}:{eve['src_port']} -> dest: {eve['dest_ip']}:{eve['dest_port']}\n"
36
message += f"protocol: {eve['proto']}\n"
37
message += f"\n{eve['alert']['signature']}\n"
38
message += f"signature ID: {eve['alert']['signature_id']}\n"
39
if "http" in eve:
40
eve_http: dict = eve["http"]
41
if eve_http != {}:
42
message += f"\n** HTTP情報 **\n"
43
message += f"hostname: {eve_http['hostname']}\n"
44
message += f"url: {eve_http['url']}\n"
45
message += f"{eve_http['http_content_type']} {eve_http['http_method']} {eve_http['status']}\n"
46
except KeyError:
47
pass
48
return message
49
50
# Email送信
51
def send_email(self, message: str):
52
for eve in self.notice_target_eve:
53
message += self.create_message(eve)
54
message += "\n===========================================================\n"
55
self.email_notice.sendEmail(message)
56
57
# LINE送信
58
def send_line(self, message: str):
59
self.line_notify.sendLineMessage(message)
60
for eve in self.notice_target_eve:
61
message = self.create_message(eve)
62
self.line_notify.sendLineMessage(message)
63
64
def main():
65
notice: Notice = Notice()
66
67
prev_timestamp: datetime = datetime.now(timezone(timedelta(hours=9)))
68
last_modified: float = 0.0
69
current_modified: float = os.path.getmtime(EVE_JSONL_PATH)
70
lower_sig_id, upper_sig_id = SIGNATURE_ID_RANGE
71
72
while True:
73
notice_target_eve: list[dict] = []
74
75
if current_modified != last_modified:
76
# eve.json取得
77
with open(EVE_JSONL_PATH, "r") as eve_jsonl:
78
for jsonl in eve_jsonl:
79
eve: dict = json.loads(jsonl)
80
81
cur_timestamp: datetime = datetime.strptime(eve["timestamp"], "%Y-%m-%dT%H:%M:%S.%f%z")
82
eve["timestamp"] = cur_timestamp
83
if cur_timestamp > prev_timestamp and "alert" in eve:
84
sig_id: int = eve["alert"]["signature_id"]
85
if lower_sig_id <= sig_id and sig_id <= upper_sig_id:
86
notice_target_eve.append(eve)
87
prev_timestamp = cur_timestamp
88
# 通知
89
if len(notice_target_eve) > 0:
90
notice.notice(notice_target_eve)
91
last_modified = current_modified
92
current_modified = os.path.getmtime(EVE_JSONL_PATH)
93
time.sleep(1)
94
95
if __name__ == '__main__':
96
main()

今回、私は特にHTTP通信での脅威に目を光らせてる(笑)ので、通知メッセージにはIPやプロコトルなどのパケット情報に加えて、ホスト名やURL、HTTPメソッドなどのHTTP情報も含めるようにしてます。

HTTP以外のプロコトル情報も一緒に通知したいというのであれば、eve.jsonに乗る情報であれば自由に通知できるかと思いますので、色々カスタマイズしてみてください。

コード中に含まれる各定数は次の項目で説明します。

コンフィグ的な

通知設定と認証情報とで別のファイルで分けてます。

通知設定

config/general.py
1
from enum import Enum
2
3
class NoticeType(Enum):
4
Nothing = 0
5
Email = 1
6
LineNotify = 2
7
Both = 3
8
9
EVE_JSONL_PATH: str = "/usr/local/var/log/suricata/eve.json"
10
NOTICE_TYPE = NoticeType.Email
11
# SIGNATURE_ID_RANGE内のシグネチャのみ通知
12
SIGNATURE_ID_RANGE: tuple[int, int] = (2000000, 2099999) # ET Open Rulesets

NoticeTypeの各設定値はこんな意味合いです。

設定値説明備考
EVE_JSONL_PATHeve.jsonのパス
NOTICE_TYPE通知タイプNoticeTypeから指定
SIGNATURE_ID_RANGE通知対象とするシグネチャIDの範囲(始まり, 終わり)で指定

今回は、通知対象とするシグネチャIDの範囲として2000000 ~ 2099999をデフォルトに指定しました。

この範囲のシグネチャIDはET Open Rulesetsで使用しています。つまり、ET Open Rulesetsに合致したパケットに絞って通知をしてもらいます。

認証情報

config/notice.py
1
SENDER_EMAIL: str = "your_email@example.com"
2
RECEIVER_EMAIL: str = "your_email@gmail.com"
3
SMTP_SERVER: str = "smtp.gmail.com"
4
SMTP_PORT: int = 587
5
SMTP_USER: str = "your_email@gmail.com"
6
SMTP_PASS: str = "**********************"
7
8
LINE_NOTIFY_TOKEN: str = "***************************" # Line Notify API Access Token

定数の説明は以下のとおりです。

定数説明備考
SENDER_EMAIL送信者メールアドレス
RECEIVER_EMAIL受信者メールアドレス
SMTP_SERVERSMTPサーバアドレスGmailはsmtp.gmail.com
SMTP_PORTSMTPポートGmailは587
SMTP_USER認証メールアドレスGoogleアカウント メールアドレス
SMTP_PASS認証パスワードGoogleアカウント アプリパスワード
LINE_NOTIFY_TOKENLINE Notifyアクセストークン

実行

さて、実際に動かしてみましょう。

しかし、ET Open RulesetsのシグネチャID範囲を通知対象としましたが、ここでは再現を容易にするためにデモ用のカスタムルールを通知対象とします。

通知対象とするデモ用カスタムルール↓

alert ssh any any -> 192.168.120.20 any (msg:"[Demo-SSH] SSH connection to Demo-VM"; ssh.software; content:"SSH"; nocase; sid:100001; rev:1;)
alert http any any -> any any (msg:"[Demo-HTTP-1] Access to himazin331.com"; http.host; content:"himazin331.com"; sid:100002; rev:1;)
alert http any any -> any any (msg:"[Demo-HTTP-2] Access to example.com"; http.host; content:"example.com"; sid:100003; rev:1;)

通知設定はこんな感じ↓

config/notice.py
1
from enum import Enum
2
3
class NoticeType(Enum):
4
Nothing = 0
5
Email = 1
6
LineNotify = 2
7
Both = 3
8
9
EVE_JSONL_PATH: str = "/usr/local/var/log/suricata/eve.json"
10
NOTICE_TYPE = NoticeType.Both
11
12
# SIGNATURE_ID_RANGE内のシグネチャのみ通知
13
SIGNATURE_ID_RANGE: tuple[int, int] = (100001, 100003) # ET Open Rulesets

動作結果

  • [Demo-HTTP-1] Access to himazin331.comのアラート通知
    Demo-HTTP-1の通知
  • [Demo-HTTP-2] Access to example.comのアラート通知
    Demo-HTTP-2の通知
  • [Demo-SSH] SSH connection to Demo-VMのアラート通知
    Demo-SSHの通知

こんな感じで、メールとLINEで通知することができました!

おまけ

実際の運用を考えると、ET Open Rulesetsのすべてを通知するのはちょっと煩わしいかなと思います。

そもそもETの中にも"ET INFO"や"ET GAMES"といったさほど重要でないカテゴリもあるので、環境に応じて不要なカテゴリは除外しても良いのかなと思います。

ということで、ET Open Rulesetsのなかで検知の可能性が高いカテゴリのみにフィルタリングして通知するように、以下のようにちょっとだけ変更を加えました。

config/genera.py
1
FILTER_SIG_CATEGORY: list[str] = [
2
"Attack Response", "DNS", "DOS", "Exploit", "FTP",
3
"ICMP", "IMAP", "Malware", "NETBIOS", "Phishing",
4
"POP3", "RPC", "Shellcode", "SMTP", "SNMP", "SQL",
5
"TELNET", "TFTP", "Web Client", "Web Server", "Web Specific Apps", "WORM"
6
] # "ET xxx"のxxx。
main.py
1
def is_priority_sig(self, sig: str) -> bool:
2
for category in PRIORITY_SIG_CATEGORY:
3
if category in sig:
4
return True
5
return False
6
7
~~
8
9
if lower_sig_id <= sig_id and sig_id <= upper_sig_id:
10
if is_priority_sig(eve["alert"]["signature"]): # <-- 追加
11
notice_target_eve.append(eve)

FILTER_SIG_CATEGORY内のカテゴリ説明だけ書いておきます。

カテゴリ説明カテゴリ説明
Attack Responseシステムへの侵入活動RPCRPCに関する攻撃や脆弱性
DNSDNSに関する攻撃や脆弱性Shellcode遠隔からのシェルコード実行
DOSDoS攻撃の試みSMTPSMTPに関する攻撃や脆弱性
Exploit特定のサービスに分類されない普遍的な攻撃SNMPSNMPに関する攻撃や脆弱性
FTPFTPに関する攻撃や脆弱性SQLSQLに関する攻撃や脆弱性
ICMPICMPに関する攻撃や脆弱性TELNETTelnetに関する攻撃や脆弱性
IMAPIMAPに関する攻撃や脆弱性TFTPTFTPに関する攻撃や脆弱性
Malwareマルウェア攻撃Web ClientWebブラウザやcurl、wgetなどに関する攻撃や脆弱性
NETBIOSNetBIOSに関する攻撃や脆弱性Web ServerWebサーバに関する攻撃や脆弱性
Phishingフィッシング活動Web Specific Apps特定のWebアプリケーションに関する攻撃や脆弱性
POP3POP3に関する攻撃や脆弱性WORMワーム攻撃

詳細はET Category Descriptions (PDF)を参照してください。

おわりに

いかがでしたか?通知アプリ開発編ということで自前で通知システムを実装してみただけですが...
実際の運用ではバックグラウンドで実行させたり、systemdでサービス起動させたりすると良いと思います。

また、実装上eve.jsonのファイルサイズが大きいと処理に時間がかかるかと思いますので、Suricataのほうで適切なログローテーション設定を行うことを強くおすすめします。

あと、これはそういうものなのかわからないのですが、GmailのSMTPサーバを経由してのメール送信が非常に遅く、アラート発報から約3分後にメール受信するというようなことになってます....私のネット環境の問題なのかSMTPサーバ側の問題なのかわからないのですが、もし改善策がわかる方いらっしゃいましたらTwitter(X)のDMやメールで教えてください!

ちなみに通知アプリのおまけ編も出そうと思っていて、少しずつですが作業を進めています。あまり新規性はないのですが...

ということで終わります。お疲れ様でした。

参考