今回はathenaを使ったログ検索を作ったため、それについて書いていこうと思います。
まずathenaってなんだ
最初に読み方ですがアテナです。めちゃくちゃかっこいい!!!僕は最初あせえなって呼んでました(英語力)
機能ですが、簡略化するとs3バケット内のデータを標準 SQL を使用して簡単に分析できるというものです。 cloudtrailのログは証跡の設定でs3バケットに送られるため、欲しいデータだけを取ってくるには最適でした。
料金ですが、 スキャンされたデータ 1 テラバイトごとに 5 USD という事です。データを圧縮することなどで、無駄を減らすこともできます。 (cloudtrailのログは圧縮されています)
今回の大まかな流れ
構成図としては以下のようになっています.
cloudtrailからのログがs3に自動保存され、別サーバからathenaを使用し、そのログを解析するようになっています。
今回は、インスタンスIDと一緒にプログラムを叩くとそのインスタンスのログを表示させるというプログラムを作成しました。
表示させる内容以下の5つです
・イベントの発生時間
・イベントのID
・イベントのリージョン
・イベントの名前
・イベントを行った人のarn
SQL文は以下のようになってます。
SELECT eventtime, eventid, awsregion, eventname,useridentity.arn FROM テーブル名 WHERE responseelements LIKE '%インスタンス名%';
準備
最初に、ログのはいっているs3バケットからテーブルを作成します。 cloudtrailのコンソールから「イベント履歴」をクリックします。上部の
Amazon Athena で高度なクエリを実行します をクリックし、s3バケットを選択しテーブルを作成します。
athenaのコンソールにでSQL文の実行や、カラムの確認などが行えます。
プログラム中身
以下に今回作成したプログラムを載せます。
import boto3 import sys from time import sleep from datetime import datetime from datetime import timedelta import pandas as pd import csv import requests key = { "access_key": 'AWSのアクセスキー', "secret_key": 'AWSのシークレットキー', "table": '検索するテーブル', "database": '使用するデータベース', "output": 's3に保存されてるログのパス' } REGION = '自分のリージョン' SQL1 = "SELECT eventtime, eventid, awsregion, eventname,useridentity.arn FROM " SQL2 = " WHERE responseelements LIKE '%" SQL3 = "%';" def get_athena(client, SQL, DATABASE, OUTPUT): #クエリの実行 query = client.start_query_execution( QueryString = SQL, QueryExecutionContext = { 'Database' : DATABASE }, ResultConfiguration={ 'OutputLocation' : OUTPUT } ) #実行クエリのID取得 queryid = query['QueryExecutionId'] #実行が終わるまで待機 while True: get = client.get_query_execution(QueryExecutionId = queryid) state = get['QueryExecution']['Status']['State'] if state in ['SUCCEEDED']: break elif state in ['FAILED', 'CANCELLED']: break else: sleep(1) #結果の取得 response = client.get_query_results( QueryExecutionId = queryid ) rows = response['ResultSet']['Rows'] dataset = [] for row in rows[1:]: datas = [] for j in range(len(row['Data'])): data = row['Data'][j]['VarCharValue'] datas.append(data) dataset.append(datas) if dataset != []: #role以下の抜き出しと時間の訂正 for i in range(len(dataset)): #抜き出し name = dataset[i][4].split('role') if len(name) == 1: dataset[i][4] = name[0] else: dataset[i][4] = name[1] #時間の訂正 time = datetime.strptime(dataset[i][0], '%Y-%m-%dT%H:%M:%SZ') + timedelta(hours = 9) dataset[i][0] = time.strftime('%Y-%m-%d %H:%M:%S') #時間順にソート dataset.sort(key=lambda x:datetime.strptime(x[0], '%Y-%m-%d %H:%M:%S')) df = pd.DataFrame(dataset,columns=['eventtime', 'eventid', 'awsregion', 'eventname','arn']) print(df) else: print("not found") ####main##### args = sys.argv INSTANCE_ID = args[1] access_key = key['access_key'] secret_key = key['secret_key'] SQL = SQL1 + key['table'] + SQL2 + INSTANCE_ID + SQL3 DATABASE = key['database'] OUTPUT = key['output'] client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=REGION) get_athena(client, SQL, DATABASE, OUTPUT)
role以下の切り抜きはほぼデータを見て行いました、、、(もうちょいスマートにかきたかった) keyをリストにすることで複数のアカウントから調べることもできます。
テスト
ここまできたらテストですね!!!!!!!!!!!!!!!
python ec2_instance.py インスタンスID
上記の実行結果を画像でのっけます!
入力したインスタンスIDのログを表示することができました!!! 表示数を増やしたい場合などはSQLなどをいじればすぐできると思います!
是非、あせえなを使った良いログライフを!!!!