Amazon Inspectorの脆弱性診断結果の差分を通知する

Amazon Inspectorは標準で脆弱性診断結果をSNSでメールを送信してくれますが、前回の診断と被っている箇所も送信してしまいます。

アップデートができない環境だったりと、修正することのできない脆弱性がある場合や、同じ脆弱性診断結果を送信してほしくない場合は自作する必要があります。

また、デフォルトの通知や、CSV形式の出力はとても見づらいです。

なので、見やすく、前回の差分を通知(メールしてくれる)ものを作ってみました!

手順はこのように進めていきます。

  • ロールの作成
  • lambda関数の作成
  • snsトピックの作成
  • Amazon inspectorの作成

今回、Lambda関数はPython3系で作成します。

通知(メール)送信先メールアドレス用のSNSトピックは各自で作成済みとします。


ロールの作成

Lambda関数に適用するロールを作成します。

[AWSサービス]と[Lambda]を選択し次へ

[AmazonInspectorReadOnlyAccess]と[AmazonSNSFullAccess]をアタッチします。


Lambda関数の作成

名前などを適当に入力、先程作成したロールを選択し作成します。

ソースコードはこちら

環境に合わせて、メール送信用のTOPIC_ARN変数[lambda_handler(event, context)]の関数名を変更してください。

import json
import boto3

TOPIC_ARN = u'arn'

# メール送信(SNS)
def mail(subject, msg):

    client = boto3.client('sns')

    request = {
        'TopicArn': TOPIC_ARN,
        'Message': msg,
        'Subject': subject
    }

    response = client.publish(**request)
    return response


# 診断結果のARNを取得
def get_findingArns(assessmentRunArn):
    client = boto3.client('inspector')

    findingArns = []
    nextToken = ""

    while(True):
        if(nextToken != ""):
            response = client.list_findings(
                assessmentRunArns=[
                    assessmentRunArn,
                ],
                maxResults=500,
                nextToken=nextToken
            )
        else:
            response = client.list_findings(
                assessmentRunArns=[
                    assessmentRunArn,
                ],
                maxResults=500
            )

        if("nextToken" not in response):
            for arn in response["findingArns"]:
                findingArns.append(arn)
            break
        else:
            for arn in response["findingArns"]:
                findingArns.append(arn)
            nextToken = response["nextToken"]

    return findingArns


# main
def lambda_handler(event, context):
    # debug用
    #mail("event json", json.dumps(event, indent=4))

    try:
        client = boto3.client('inspector')

        # Arn取得
        obj = event["Records"][0]["Sns"]["Message"]
        obj = json.loads(obj)
        assessmentRunArn = obj["run"]
        assessmentTemplateArn = assessmentRunArn.rsplit("/", 2)[0]

        # 実行したテンプレートの一つ前の診断結果のARNを取得
        assessmentRunArns = []
        nextToken = ""
        while(True):
            if(nextToken != ""):
                response = client.list_assessment_runs(
                    assessmentTemplateArns=[
                        assessmentTemplateArn,
                    ],
                    maxResults=500,
                    nextToken=nextToken
                )
            else:
                response = client.list_assessment_runs(
                    assessmentTemplateArns=[
                        assessmentTemplateArn,
                    ],
                    maxResults=500
                )

            if("nextToken" not in response):
                for arn in response["assessmentRunArns"]:
                    assessmentRunArns.append(arn)
                break
            else:
                for arn in response["assessmentRunArns"]:
                    assessmentRunArns.append(arn)
                nextToken = response["nextToken"]

        # get_findingArns
        findingArns = get_findingArns(assessmentRunArn)
        size = 100
        output = ""
        describe_findings = []
        findingArns_split = [findingArns[x:x + size] for x in range(0, len(findingArns), size)]
        for findingArns_100 in findingArns_split:
            describe_findings = describe_findings + client.describe_findings(findingArns=findingArns_100, locale='EN_US')["findings"]

        # 初回実行だったら
        if(len(assessmentRunArns) == 1):
            for finding in describe_findings:
                title = finding["title"]
                output = output + title + "  :  " + finding["arn"] + "\n"
            mail('Inspector alert', output)
            return output                        
        else:
            before_assessmentRunArn = assessmentRunArns[-2]

        # 初回実行以外は一回前の診断結果と比較
        # get_before_findingArns
        before_findingArns = get_findingArns(before_assessmentRunArn)
        before_describe_findings = []

        before_findings_split = [before_findingArns[x:x + size] for x in range(0, len(before_findingArns), size)]
        for before_findingArns_100 in before_findings_split:
            before_describe_findings = before_describe_findings + client.describe_findings(findingArns=before_findingArns_100, locale='EN_US')["findings"]

        before_title = []
        for finding in before_describe_findings:
            before_title.append(finding["title"])

        for finding in describe_findings:
            title = finding["title"]

            if(title not in before_title):
                output = output + title + "  :  " + finding["arn"] + "\n"

        # アラート
        if(output != ""):
            mail('Inspector alert', output)
        else:
            output = "Nothing to alert."
            mail('Inspector alert', output)

        return output

    except:
        import traceback
        output = traceback.format_exc()
        mail('Inspector error', output + "\n\n" + json.dumps(event, indent=4))
        return output


SNSトピックの作成

ここではLambda関数を実行するためのSNSトピックを作成します。


Amazon Inspectorの設定

Amazon Inspectorの使い方などはこちら
初心者でも簡単にできるAmazon Inspectorによる脆弱性診断
Amazon Inspectorとは

設定の際、SNSトピックに先程作成したトピックを設定します。
また、イベントは以下のようにします。
(2018/11/14 報告された結果だと、Inspectorの結果が1つずつメールで送信されてしまうので、イベントを[実行完了]に変更しました)

Amazon Inspector作成時にトピックのエラーが出たら
[SNS] > [トピック] > [先程作成したトピック] > [その他のトピックの操作] > [トピックのポリシーの編集]
と進み、以下を設定します。

値はエラーで出てきた12桁の数字を設定してください。

これで Amazon Inspector → Lambda と実行することができ、同じテンプレートの1回前との差分を通知してくれます。

テンプレート作成時などlambdaが叩かれる場合はエラーメールが飛ぶと思いますが、無視していただいて大丈夫です。(実装がめんどくさかったので笑)

inspector用のboto3があるので色々できます。

ドキュメントはこちら