はじめに
サーバーのインシデントが発生したとき、まずサーバーのログファイルを確認しますが、膨大な量のログを1つ1つ人の目で確認するのは途方もない時間がかかってしまいます。
そこで我々は、MicrosoftのAzureの機能を使ってLLMにログを解析してもらう仕組みを作りました。
サーバー管理者は3個のコマンドでログの異変を調査することができます。
目次
ログ解析の流れ
Azureの機能を使ってログを解析するまでの流れを示します。
今回は、ミドルウェアがnginxのAWSサーバーのアクセスログとエラーログを解析することを目標とします。
- サーバーからnginxのアクセスログとエラーログをローカルの端末に転送する。
- 取得したログファイルをAzureのストレージにアップロードする。
- AI SearchとAzure OpenAIにクエリを投げて、ログを解析する。
これらの手順をひとつひとつ解説します。
ローカルの端末のdirectoryの構成
. ├── logs/ │ ├── access_log-%Y%m%d%H%M%S.txt │ └── error_log-%Y%m%d%H%M%S.txt ├── src/ │ ├── log_auto.sh │ ├── upload_to_blob.py │ └── analyze.py ├── ssh_keys/ │ └── hoge.pem └── .env
.envファイルの環境変数
AZURE_STORAGE_CONNECTION_STRING="" AZURE_STORAGE_CONTAINER_NAME="" AZURE_OPENAI_API_KEY="" AZURE_OPENAI_API_VERSION="" AZURE_OPENAI_ENDPOINT="" AZURE_SEARCH_ENDPOINT="" AZURE_SEARCH_INDEX_NAME="" AZURE_SEARCH_KEY="" AZURE_OPENAI_DEPLOYMENT_NAME=" SSH_KEY="hoge.pem" SSH_USER="ec2-user" SERVER_IP="XX.XX.XX.XX" LOCAL_DIR="logs/" REMOTE_HOME_DIR="/home/ec2-user/" REMOTE_LOG_DIR="/var/log/nginx/"
サーバーからログファイルを取得する
sshコマンドとscpコマンドを使って、AWSサーバーからログファイルをローカルの端末に転送するコードを下に示します。
#!/bin/bash CUR_DIR=$(cd $(dirname $0); pwd) # envから環境変数を読み込む set -a source ${CUR_DIR}/../.env set +a # ssh接続して/home/ec2-user/にログファイルをコピーする now=$(date +%Y%m%d%H%M%S) PATH_TO_SSH_KEY="${CUR_DIR}/../ssh_keys/${SSH_KEY}" ssh -i ${PATH_TO_SSH_KEY} ${SSH_USER}@${SERVER_IP} "sudo sh -c 'grep $(date +%d/%b) ${REMOTE_LOG_DIR}/access.log > ${REMOTE_HOME_DIR}/accesslog-${now}.txt'" ssh -i ${PATH_TO_SSH_KEY} ${SSH_USER}@${SERVER_IP} "sudo sh -c 'grep $(date +%m/%d) ${REMOTE_LOG_DIR}/error.log > ${REMOTE_HOME_DIR}/errorlog-${now}.txt'" if [ $? -ne 0 ]; then echo "SSH connection failed or command execution failed." exit 1 fi # scpコマンドでローカルのディレクトリに転送する logfile=$(ssh -i ${PATH_TO_SSH_KEY} ${SSH_USER}@${SERVER_IP} "ls -1 ${REMOTE_HOME_DIR}/*log-${now}.txt") if [ -z "$logfile" ]; then echo "No log files found for the current date." exit 0 fi for remote_filepath in $logfile do echo "Transferring ${remote_filepath} to ${CUR_DIR}/../${LOCAL_DIR}" filename=$(basename ${remote_filepath}) # ファイルの名前だけにする scp -i ${PATH_TO_SSH_KEY} ${SSH_USER}@${SERVER_IP}:${remote_filepath} "${CUR_DIR}/../${LOCAL_DIR}/${filename}" if [ $? -ne 0 ]; then echo "Failed to transfer ${remote_filepath}." exit 1 fi done
nginxのアクセスログやエラーログは/var/log/nginx/
にあるのですが、root
権限下のディレクトリなので、一度、一般ユーザーのホームディレクトリ (AWSサーバーの場合、/home/ec2-user/
)の下にコピーします。
このAWSサーバーでは、ログは1週間単位で保存されているのですが、今回はインシデントが発生した日のログが欲しいため、grep
コマンドを使って抽出しています。
この抽出方法は、ログの日付の表記方法に依存するため、ログファイルごとに抽出する文字列を変える必要があります。
このコマンドを実行すると、ローカルの端末のlogs
ディレクトの下にログが転送されます。
ログファイルをAzureのストレージにアップロードする
ここでは、AWSサーバーのログファイルをPythonのスクリプトを使い、Azureのストレージにアップロードする方法を説明します。
まず、Azureにログインして適切なサブスクリプションとリソースグループにてストレージアカウントを作ってください。
ストレージアカウントの作り方
- Azureのホーム画面から「ストレージアカウント」を選択し、「作成」に進む。
- プライマリ サービスを「Azure Blob Storage または Azure Data Lake Storage Gen 2」、パフォーマンスを「Standard」、冗長性を「ローカル 冗長ストレージ(LRS)」にして、作成する。
それでは、Pythonのスクリプトを使って、ストレージアカウントにログファイルをアップロードします。
まずは必要なパッケージをインストールします。
pip install azure-storage-blob python-dotenv
次に下記のPythonスクリプトを実行します。
import os import sys from azure.storage.blob import BlobServiceClient from azure.core.exceptions import ResourceExistsError from dotenv import load_dotenv # .env を読み込む current_dir = os.path.dirname(__file__) path_to_env = os.path.join(current_dir, '../.env') load_dotenv(path_to_env) # 環境変数から取得 CONNECTION_STRING = os.getenv("AZURE_STORAGE_CONNECTION_STRING") CONTAINER_NAME = os.getenv("AZURE_STORAGE_CONTAINER_NAME") def upload_file_to_blob(local_file_path): if not os.path.isfile(local_file_path): print(f":❌: Error: '{local_file_path}' does not exist.") return blob_name = os.path.basename(local_file_path) blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING) try: container_client = blob_service_client.create_container(CONTAINER_NAME) print(f"コンテナ '{CONTAINER_NAME}' を作成しました。") except ResourceExistsError: # print(f"コンテナ '{CONTAINER_NAME}' はすでに存在します。") container_client = blob_service_client.get_container_client(CONTAINER_NAME) blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=blob_name) with open(local_file_path, "rb") as data: blob_client.upload_blob(data, overwrite=True) print(f":✅: Uploaded: {local_file_path} → Blob '{CONTAINER_NAME}/{blob_name}'") if __name__ == "__main__": if len(sys.argv) < 2: # ファイルを一つずつでなく複数アップロードする # print(sys.argv) print("Usage: python upload_to_blob.py <file_path>") else: for i in range(1, len(sys.argv)): upload_file_to_blob(sys.argv[i])
下記のコマンドで実行できます。
$ python upload_to_blob.py logs/*
このスクリプトでは、ストレージアカウントに.env
ファイルで指定した名前のコンテナ名がなければ、作成します。
その後、logs/
の下にあるログを指定したコンテナにアップロードします。
ファイルのアップロードがうまくいくと、画面のように新しくコンテナが作られ、その中にファイルが存在することを確認できます。
AI SearchとAzure OpenAIにクエリを投げて、ログを解析する
最後にAI SearchとAzure OpenAIに質問を投げて、ログの解析結果を取得する方法を説明します。
まず、ストレージアカウントを作成したのと同様に、Azure OpenAIから新しくインスタンスを作成しください。
インスタンスを作成できたら、AI Foundryよるgpt-4
とtext-embedding-3-small
の2種類のモデルをデプロイしてください。
gpt-4
はサーバー管理者がLLMと対話する際に使うモデル、text-embedding-3-small
はRAGを構築する際にアップロードしたファイルをベクトル化するのに用います。
ここでは、AI Searchとストレージアカウントを紐付ける方法を説明します。
AI Searchとストレージアカウントを紐付ける方法
- AI Searchの画面から、「データのインポートとベクター化」を選択し、Azure Blob Storage、RAGへと進んでください。
- 「データへの接続」で、サブスクリプション、ストレージ アカウント、BLOB コンテナーに適切な情報を入力してください。ストレージ アカウントは先ほど作ったアカウントと同じ名前、BLOB コンテナーは
.env
ファイルで指定したものと同じものを記入してください。
- 「テキストをベクトル化する」で、Azure OpenAI Serviceは先ほど作成した、Azure OpenAIのインスタンスの名前を入力してください。モデル デプロイは
text-embedding-3-small
を指定してください。
「画像をベクター化してエンリッチする」はスキップしてもいいです。
「詳細設定」でセマンティック ランカーを有効にするにチェックを入れてください。こうすることで日本語の検索にも上手く対応してくれます。また、インデックス作成のスケジュールを毎時間にすることで、新しくファイルをアップロードした場合にも対応できるようにします。
- [レビューと作成]にて、オブジェクト名のプレフィックスを好きな名前に設定してください。この名前は、インデクサーの名前に反映されるため、
.env
ファイルのAZURE_SEARCH_INDEX_NAME
変数も変更してください。
ここまでできたら、Pythonプログラムを実行します。 先ほどと同様に、必要なライブラリをインストールします。
pip install openai azure-search-documents
AzureのLLMにログの解析結果を聞くスクリプトを下に示します。
# Azure OpenAIに繋げる import os from openai import AzureOpenAI from dotenv import load_dotenv # RAG処理できるようにAzure Searchとも繋げる from azure.search.documents import SearchClient from azure.core.credentials import AzureKeyCredential import asyncio # asyncioをインポート # .envファイルから環境変数を読み込む current_dir = os.path.dirname(__file__) path_to_env = os.path.join(current_dir, '../.env') load_dotenv(path_to_env) # Azure OpenAIに接続するためのクライアント作成 client = AzureOpenAI( api_key=os.getenv("AZURE_OPENAI_API_KEY"), api_version=os.getenv("AZURE_OPENAI_API_VERSION"), azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), ) # Azure Searchに接続するためのクライアント作成 search_client = SearchClient( endpoint=os.getenv("AZURE_SEARCH_ENDPOINT"), index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"), credential=AzureKeyCredential(os.getenv("AZURE_SEARCH_KEY")) ) # ドキュメント検索(与えられた質問(query)をもとに検索) def search_documents(query: str) -> str: docs = [] try: results = search_client.search(query) results_list = list(results) for result in results_list: chunk = result.get("chunk", "コンテンツなし (フィールド 'content' が見つかりません)") # またはログの本文が入るフィールド名 docs.append(chunk) print(f"--- 抽出されたドキュメント数: {len(docs)} ---") return "\n\n".join(docs) except Exception as e: print(f"エラー: Azure AI Searchでの検索中に例外が発生しました: {e}") return "" async def chat_with_openai(user_input: str) -> str: try: # システムメッセージを入れる messages = [{"role": "system", "content": "あなたはログ解析のアシスタントです"}] # ユーザーからの質問を追加 (RAGコンテキスト生成のため) messages.append({ "role": "user", "content": user_input }) # ユーザーのqueryを元にインデックス検索 context = search_documents(user_input) # RAGの結果をプロンプトに追加 if context: prompt = f"""不正なアクセスや攻撃を検出してください。検出した不正アクセスや攻撃の個数も教えてください。検出した場合、すべての不正アクセスや攻撃について、その種類、接続元のIPアドレス、なぜそのように判断したか、を詳しく説明してください。ログ:{context},質問:{user_input} """ else: # 検索結果がない場合は、ログがないと明言させる prompt = f"""ユーザーの意図するログがありません。「該当するログがありません」と明記してください。 質問: {user_input} """ messages.append({"role": "user", "content": prompt}) print("\n--- Azure OpenAI にリクエスト送信中 ---") response = client.chat.completions.create( model=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"), messages=messages, temperature=0.7, max_tokens=3000 ) #回答部分を抽出 reply = response.choices[0].message.content return reply except Exception as e: print(f"エラー: OpenAI API呼び出し中に例外が発生しました: {e}") return f"OpenAI エラー: {e}" # --- スクリプト実行部分 --- async def main(): print("ログ解析アシスタントへようこそ!") print("終了するには 'exit' と入力してください。") while True: user_input = input("\n質問を入力してください: ") if user_input.lower() == 'exit': break response_text = await chat_with_openai(user_input) print("\n--- Azure OpenAI からの応答 ---") print(response_text) if __name__ == "__main__": asyncio.run(main())
ターミナルで下記のコマンドを実行することで、ログ解析の結果を得ることができます!
$ python src/analyze_log.py ログ解析アシスタントへようこそ! 終了するには 'exit' と入力してください。 質問を入力してください: nginxのログを解析して --- 抽出されたドキュメント数: 12 --- --- Azure OpenAI にリクエスト送信中 --- --- Azure OpenAI からの応答 --- 与えられたnginxログから、不正アクセスや攻撃と思われるアクセスを解析します。 --- ## 1. 不正アクセス・攻撃の検出結果 **検出した不正アクセス・攻撃の件数:** **8件** --- ## 2. 詳細な解析 ### (1) WordPressログインページへの大量アクセス(ブルートフォース/辞書攻撃の可能性) #### アクセス例 #### 種類 - #### 接続元IP - #### 判断理由 - --- ### (2) .envファイルへのアクセス(情報漏洩狙い) #### アクセス例 #### 種類 - #### 接続元IP - #### 判断理由 - --- ### (3) その他WordPress関連ディレクトリ探索 #### アクセス例 #### 種類 - --- ### (4) 正規Bot・SEOクローラー等 #### 例 - Googlebot, Bingbot, DotBot, DataForSeoBot, Amazonbot, ClaudeBot, MJ12bot, UptimeRobot, Thinkbot など #### 判断 - これらはUser-AgentやIPからみて正規ボットが多い。 - 通常のクロール頻度・挙動であり、攻撃とはみなさない。 --- ### (5) その他 #### 下記のようなアクセスは攻撃に該当しない - 通常ページ・タグ・フィード・記事などへのアクセス - 検索クエリのURLパラメータ(明らかなSQLiやXSSはなし) --- ## 3. 検出サマリ ### 検出した不正アクセス・攻撃の個数 --- ## 4. 詳細まとめ ### 1. WordPressログインページへの攻撃 - **IP**: - **理由**: `/wp-login.php`への直接アクセス(403が返っている)、複数IPからの短時間連続アクセス - **種類**: ログインブルートフォース/辞書攻撃 ### 2. .envファイル探索攻撃 - **IP**: - **理由**: `/laravel/core/.env`など複数の`.env`ファイルパスへのアクセス。自動化スキャンの典型。 - **種類**: サーバー設定ファイル探索・情報漏洩狙い --- ## 5. 補足 - 正規のボットアクセスは多いですが、攻撃とは判断しません。 - `/wp-login.php`や`.env`ファイル探索は明確な攻撃の兆候です。 - 今回のログ範囲ではSQLインジェクションやXSS等の痕跡は見られませんでした。 --- **ご質問や詳細分析が必要な場合は追加でお知らせください。** 質問を入力してください: exit
まとめ
今回は、Azureのサービスを使って、ミドルウェアがnginxのAWSサーバーを解析する方法を紹介しました。
3種類のスクリプトを実行することで、サーバーからログを取得し、Azureのストレージにアップロードし、LLMに解析結果を聞くことができました。
このシステムを使うことで、ログ解析の時間を短縮することができます。
また、環境変数やスクリプトの一部を変更するだけで、他のログの解析も行うことができます。