Suricata with Raspberry Piで遊ぼう!【通知アプリ開発編】

概要

Suricataで不審なパケットを検知したら、メールなりで通知してほしいなぁって思いました。
しかし、Suricataそのものには通知機能はないっぽい?ので、サードパーティ製のアプリを活用するか自前で実装する必要があるっぽいです。
ということなので、アラート発報時にGmail / LINEにて通知してくれる通知アプリを今回作りました。

前提

環境

  • Raspberry Pi 4 Model B [RAM 4GB, Raspberry Pi OS LIte (64-bit)]
$ uname -a
Linux rasp4-1 6.1.21-v8+ #1642 SMP PREEMPT Mon Apr  3 17:24:16 BST 2023 aarch64 GNU/Linux
  • Suricata 6.0.13
  • Python 3.9.2

前準備

今回、通知先にGmailとLINEを採用します。そこで前準備をします。

Gmailアプリパスワードの作成

アプリパスワードを作成します。
2段階認証プロセスに対応していないアプリでも、アプリパスワードを使用することでGoogleアカウントにログインできるようになります。
なお、Gmail以外のメールサービスや自前のメールサーバを使いたい場合はスキップしてください。

1.以下のリンクより2段階認証プロセスを有効にします。(案内に従って有効にしてください)

Googleアカウント セキュリティ - https://myaccount.google.com/security

2.2段階認証プロセスを有効にしたら、以下のリンクよりアプリパスワードを作成します。

Googleアカウント アプリパスワード - https://myaccount.google.com/apppasswords


アプリを選択 > その他 > (任意の名前) で、"生成"をクリックします。

アプリパスワード作成画面

3.生成されたアプリパスワードが表示されます。

これは後ほど使うので控えておきます。

生成されたアプリパスワード


完了ボタンをクリックすると前の画面に戻り、作成したアプリ名と作成日が確認できます。

アプリパスワード一覧

LINE Notifyアクセストークンの作成

LINE Notifyのアクセストークンを作成します。
LINE Notifyとは、LINEが提供する通知APIで無料で利用可能です。
なお、LINEを使ってない方やLINEへの通知が不要な場合はスキップしてください。

1.以下のリンク先にアクセスし、LINEにログインします。

LINE Notify - https://notify-bot.line.me/ja/


画面上部右にある"ログイン"からログインします。

LINE Notifyトップ画面

2.ログイン後に下のようなステップを踏んでください。

トークン発行

3.トークン名と送信先のトークルームを選択して、"発行する"をクリックします。

今回はLINE Notifyと1対1で通知を受け取ります。

Suricataアラート通知受信用にグループを作っておいて、それを選択してもいいかもですね。

トークン発行画面

4.発行されたトークンを"コピー"でコピーし控えておきます。

(もちろん下のトークンはダミーです)

発行されたトークン

閉じると、連携済みのサービスということで一覧に表示されます。

連携中サービス一覧

実装

さて、前準備も済んだので実装を.....しました。

コードはGitHubにあげてますのでお好きにご活用ください。
GitHub - https://github.com/himazin331/suricata-alert-notice/tree/no_gpio

ここにもコードベタ貼りしておきます。
コード中にコメント散りばめてるので何となく分かると思いますが、ちょくちょく説明を挟んでおきます。

メール送信

メール送信処理です。
なんかよしなにメールを送信してくれます。

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

from config.notice import *

# メール送信
class EMailSend():
    # メール送信
    def sendEmail(self, message: str):
        msg: MIMEMultipart = MIMEMultipart()
        msg["From"] = SENDER_EMAIL
        msg["To"] = RECEIVER_EMAIL
        msg["Subject"] = "[SuricataAlertNotice] 不審なパケットを検知しました!"
        msg.attach(MIMEText(message, "plain"))

        # GmailのSMTPサーバーへの接続とメール送信
        try:
            with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
                server.starttls() # TLS暗号化の開始
                server.login(SMTP_USER, SMTP_PASS) # SMTPサーバーへのログイン

                server.send_message(msg)
        except Exception as e:
            print(e)

LINE送信

LINE Notifyを通じてメッセージを送信する処理です。
いつものようにAuthorizationヘッダにトークン乗っけて、メッセージと一緒にPOSTするだけです。

import requests

from config.notice import LINE_NOTIFY_TOKEN

# Notify送信
class LineNotifySend():
    # Line Notify API セットアップ
    def __init__(self):
        self.url: str = "https://notify-api.line.me/api/notify"
        self.line_headers: dict[str, str] = {'Authorization': 'Bearer ' + LINE_NOTIFY_TOKEN}

    # メッセージ送信
    def sendLineMessage(self, message: str):
        try:
            payload: dict[str, str] = {'message': message}
            # Line送信
            requests.post(self.url, headers=self.line_headers, params=payload)
        except requests.exceptions.RequestException as e:
            print(e)

メイン処理

main.pyです。

やってることは通知なのですが、細かく説明すると、

  1. eve.jsonに更新があるかをチェック。(ファイル更新日時を監視する)
  2. 更新があったら、eve.jsonの読み取りとparse。
  3. 新規のレコードかつアラートであることを示すalertを含むかをみる。
  4. 通知対象のシグネチャIDであるかをみる。通知対象であれば一旦リストにappend。
  5. 2と3をeve.jsonの最終行まで繰り返す。
  6. リスト内のレコードからパケット情報を取り出してメッセージを作成。
  7. メッセージをメールまたはLINE送信処理へ渡す。
  8. 1~7を1秒間隔でループ。

と、こんな感じです。

import json
import os
import time
from datetime import datetime, timezone, timedelta

from send_mail import EMailSend
from send_line import LineNotifySend

from config.general import *

class Notice():
    def __init__(self):
        self.email_notice: EMailSend = EMailSend()
        self.line_notify: LineNotifySend = LineNotifySend()

        self.notice_target_eve: list[dict] = []

    # 通知
    def notice(self, notice_target_eve: list[dict]):
        self.notice_target_eve = notice_target_eve

        message: str = "不審な通信を検知しました!\n"
        message += f"件数: {len(self.notice_target_eve)} 件\n"
        if not (NOTICE_TYPE is NoticeType.Nothing):
            if NOTICE_TYPE is NoticeType.LineNotify or NOTICE_TYPE is NoticeType.Both: # LINE通知
                self.send_line(message)
            if NOTICE_TYPE is NoticeType.Email or NOTICE_TYPE is NoticeType.Both: # Email送信
                self.send_email(message)

    # メッセージ作成
    def create_message(self, eve: dict) -> str:
        try:
            message: str = "\n** 情報 **\n"
            message += f"timestamp: {eve['timestamp'].strftime('%Y/%m/%d %H:%M:%S')}\n"
            message += f"src: {eve['src_ip']}:{eve['src_port']} -> dest: {eve['dest_ip']}:{eve['dest_port']}\n"
            message += f"protocol: {eve['proto']}\n"
            message += f"\n{eve['alert']['signature']}\n"
            message += f"signature ID: {eve['alert']['signature_id']}\n"
            if "http" in eve:
                eve_http: dict = eve["http"]
                if eve_http != {}:
                    message += f"\n** HTTP情報 **\n"
                    message += f"hostname: {eve_http['hostname']}\n"
                    message += f"url: {eve_http['url']}\n"
                    message += f"{eve_http['http_content_type']} {eve_http['http_method']} {eve_http['status']}\n"
        except KeyError:
            pass
        return message

    # Email送信
    def send_email(self, message: str):
        for eve in self.notice_target_eve:
            message += self.create_message(eve)
            message += "\n===========================================================\n"
        self.email_notice.sendEmail(message)
    
    # LINE送信
    def send_line(self, message: str):
        self.line_notify.sendLineMessage(message)
        for eve in self.notice_target_eve:
            message = self.create_message(eve)
            self.line_notify.sendLineMessage(message)

def main():
    notice: Notice = Notice()

    prev_timestamp: datetime = datetime.now(timezone(timedelta(hours=9)))
    last_modified: float = 0.0
    current_modified: float = os.path.getmtime(EVE_JSONL_PATH)
    lower_sig_id, upper_sig_id = SIGNATURE_ID_RANGE

    while True:
        notice_target_eve: list[dict] = []

        if current_modified != last_modified:
            # eve.json取得
            with open(EVE_JSONL_PATH, "r") as eve_jsonl:
                for jsonl in eve_jsonl:
                    eve: dict = json.loads(jsonl)

                    cur_timestamp: datetime = datetime.strptime(eve["timestamp"], "%Y-%m-%dT%H:%M:%S.%f%z")
                    eve["timestamp"] = cur_timestamp
                    if cur_timestamp > prev_timestamp and "alert" in eve:
                        sig_id: int = eve["alert"]["signature_id"] 
                        if lower_sig_id <= sig_id and sig_id <= upper_sig_id:
                            notice_target_eve.append(eve)
                        prev_timestamp = cur_timestamp
            # 通知
            if len(notice_target_eve) > 0:
                notice.notice(notice_target_eve)
            last_modified = current_modified
        current_modified = os.path.getmtime(EVE_JSONL_PATH)
        time.sleep(1)

if __name__ == '__main__':
    main()

今回、私は特にHTTP通信での脅威に目を光らせてる(笑)ので、通知メッセージにはIPやプロコトルなどのパケット情報に加えて、
ホスト名やURL、HTTPメソッドなどのHTTP情報も含めるようにしてます。
HTTP以外のプロコトル情報も一緒に通知したいというのであれば、eve.jsonに乗る情報であれば自由に通知できるかと思いますので、
色々カスタマイズしてみてください。

コード中に含まれる各定数は次の項目で説明します。

コンフィグ的な

通知設定と認証情報とで別のファイルで分けてます。

通知設定

from enum import Enum

class NoticeType(Enum):
    Nothing = 0
    Email = 1
    LineNotify = 2
    Both = 3

EVE_JSONL_PATH: str = "/usr/local/var/log/suricata/eve.json"
NOTICE_TYPE = NoticeType.Email
# SIGNATURE_ID_RANGE内のシグネチャのみ通知
SIGNATURE_ID_RANGE: tuple[int, int] = (2000000, 2099999) # ET Open Rulesets

NoticeTypeの各設定値はこんな意味合いです。

通知タイプ

説明

Nothing

通知しない (動作確認用)

Email

メール通知のみ

LineNotify

LINE通知のみ

Both

メール/LINE通知

 

その他の設定値は以下のとおりです。

設定値

説明

備考

EVE_JSONL_PATH

eve.jsonのパス

NOTICE_TYPE

通知タイプ

NoticeTypeから指定

SIGNATURE_ID_RANGE

通知対象とするシグネチャIDの範囲

(始まり, 終わり)で指定

今回は、通知対象とするシグネチャIDの範囲として2000000 ~ 2099999をデフォルトに指定しました。

この範囲のシグネチャIDはET Open Rulesetsで使用しています。つまり、ET Open Rulesetsに合致したパケットに絞って通知をしてもらいます。

認証情報

SENDER_EMAIL: str = "your_email@example.com"
RECEIVER_EMAIL: str = "your_email@gmail.com"
SMTP_SERVER: str = "smtp.gmail.com"
SMTP_PORT: int = 587
SMTP_USER: str = "your_email@gmail.com"
SMTP_PASS: str = "**********************"

LINE_NOTIFY_TOKEN: str = "***************************" # Line Notify API Access Token

定数の説明は以下のとおりです。

定数

説明

備考

SENDER_EMAIL

送信者メールアドレス

RECEIVER_EMAIL

受信者メールアドレス

SMTP_SERVER

SMTPサーバアドレス

Gmailはsmtp.gmail.com

SMTP_PORT

SMTPポート

Gmailは587

SMTP_USER

認証メールアドレス

Googleアカウント メールアドレス

SMTP_PASS

認証パスワード

Googleアカウント アプリパスワード

LINE_NOTIFY_TOKEN

LINE Notifyアクセストークン

実行

さて、実際に動かしてみましょう。

しかし、ET Open RulesetsのシグネチャID範囲を通知対象としましたが、ここでは再現を容易にするためにデモ用のカスタムルールを通知対象とします。

 

通知対象とするデモ用カスタムルール↓

alert ssh any any -> 192.168.120.20 any (msg:"[Demo-SSH] SSH connection to Demo-VM"; ssh.software; content:"SSH"; nocase; sid:100001; rev:1;)
alert http any any -> any any (msg:"[Demo-HTTP-1] Access to himazin331.com"; http.host; content:"himazin331.com"; sid:100002; rev:1;)                                    
alert http any any -> any any (msg:"[Demo-HTTP-2] Access to example.com"; http.host; content:"example.com"; sid:100003; rev:1;)                                          

通知設定はこんな感じ↓

from enum import Enum

class NoticeType(Enum):
    Nothing = 0
    Email = 1
    LineNotify = 2
    Both = 3

EVE_JSONL_PATH: str = "/usr/local/var/log/suricata/eve.json"
NOTICE_TYPE = NoticeType.Both

# SIGNATURE_ID_RANGE内のシグネチャのみ通知
SIGNATURE_ID_RANGE: tuple[int, int] = (100001, 100003) # ET Open Rulesets

動作結果

  • [Demo-HTTP-1] Access to himazin331.comのアラート通知

    Demo-HTTP-1の通知
  • [Demo-HTTP-2] Access to example.comのアラート通知

    Demo-HTTP-2の通知

  • [Demo-SSH] SSH connection to Demo-VMのアラート通知

    Demo-SSHの通知

こんな感じで、メールとLINEで通知することができました!

おまけ

実際の運用を考えると、ET Open Rulesetsのすべてを通知するのはちょっと煩わしいかなと思います。

そもそもETの中にも"ET INFO"や"ET GAMES"といったさほど重要でないカテゴリもあるので、環境に応じて不要なカテゴリは除外しても良いのかなと思います。

ということで、ET Open Rulesetsのなかで検知の可能性が高いカテゴリのみにフィルタリングして通知するように、以下のようにちょっとだけ変更を加えました。

config/general.py

FILTER_SIG_CATEGORY: list[str] = [
        "Attack Response", "DNS", "DOS", "Exploit", "FTP", 
        "ICMP", "IMAP", "Malware", "NETBIOS", "Phishing", 
        "POP3", "RPC", "Shellcode", "SMTP", "SNMP", "SQL", 
        "TELNET", "TFTP", "Web Client", "Web Server", "Web Specific Apps", "WORM"
    ] # "ET xxx"のxxx。

main.py

def is_priority_sig(self, sig: str) -> bool:
    for category in PRIORITY_SIG_CATEGORY:
        if category in sig:
            return True
    return False

~ 略 ~

if lower_sig_id <= sig_id and sig_id <= upper_sig_id:
    if is_priority_sig(eve["alert"]["signature"]):  # <-- 追加
        notice_target_eve.append(eve)

FILTER_SIG_CATEGORY内のカテゴリ説明だけ書いておきます。

カテゴリ

説明

カテゴリ

説明

Attack Response

システムへの侵入活動

RPC

RPCに関する攻撃や脆弱性

DNS

DNSに関する攻撃や脆弱性

Shellcode

遠隔からのシェルコード実行

DOS

DoS攻撃の試み

SMTP

SMTPに関する攻撃や脆弱性

Exploit

特定のサービスに分類されない普遍的な攻撃

SNMP

SNMPに関する攻撃や脆弱性

FTP

FTPに関する攻撃や脆弱性

SQL

SQLに関する攻撃や脆弱性

ICMP

ICMPに関する攻撃や脆弱性

TELNET

Telnetに関する攻撃や脆弱性

IMAP

IMAPに関する攻撃や脆弱性

TFTP

TFTPに関する攻撃や脆弱性

Malware

マルウェア攻撃

Web Client

Webブラウザやcurl、wgetなどに関する攻撃や脆弱性

NETBIOS

NetBIOSに関する攻撃や脆弱性

Web Server

Webサーバに関する攻撃や脆弱性

Phishing

フィッシング活動

Web Specific Apps

特定のWebアプリケーションに関する攻撃や脆弱性

POP3

POP3に関する攻撃や脆弱性

WORM

ワーム攻撃

詳細はET Category Descriptions (PDF)を参照してください。

おわりに

いかがでしたか?通知アプリ開発編ということで自前で通知システムを実装してみただけですが...

 

実際の運用ではバックグラウンドで実行させたり、systemdでサービス起動させたりすると良いと思います。

また、実装上eve.jsonのファイルサイズが大きいと処理に時間がかかるかと思いますので、Suricataのほうで適切なログローテーション設定を行うことを強くおすすめします。

 

あと、これはそういうものなのかわからないのですが、GmailのSMTPサーバを経由してのメール送信が非常に遅く、アラート発報から約3分後にメール受信するというようなことになってます....私のネット環境の問題なのかSMTPサーバ側の問題なのかわからないのですが、もし改善策がわかる方いらっしゃいましたらTwitter(X)のDMやメールで教えてください!

 

ちなみに通知アプリのおまけ編も出そうと思っていて、少しずつですが作業を進めています。あまり新規性はないのですが...

ということで終わります。お疲れ様でした。

参考