VPCフローログをAWS Elasticsearch Servicesで可視化するまで

サーバやネットワークの通信等の生ログを分析・テキスト検索し可視化するサービスの1つとして、Elasticsearchがありますが、今回はAWS環境において通信ログ(VPCフローログ)を一度DB(DynamoDB)に投入した上で、Elasticsearch(Elasticsearch Service)で可視化するまでを実施していきたいと思います。

今回利用するAWSサービスと構成

  1. VPCフローログ
  2. Cloudwatch logs
  3. DynamoDB
  4. Cognito (ユーザプール/IDプール)
  5. Elasticsearch Service(AmazonES)
  6. lambda
  7. IAM

上記のサービスを組み合わせてVPCフローログのログ分析と可視化を実現していきます。
構成は以下のようなイメージで実装しています。フローログはDynamoDB1カ所に入れた上でAmazon ESで可視化してみます。cloudwatch logsから直接AmazonESで可視化も出来ますが、ログを別のところでも使いたいのでDynamoDBにも入れておきます。 f:id:tamo_bbbreak:20180808000344p:plain


1. VPCフローログの設定

VPCフローログがCloudwatch logsに格納されるように設定します。

f:id:tamo_bbbreak:20180803144932p:plain

f:id:tamo_bbbreak:20180808002205p:plain

f:id:tamo_bbbreak:20180808002219p:plain

2. Cloudwatch logsの設定

cloudwatch logsのロググループを作成しておきます。
全作業が完了したらlambda関数の実行ログとVPCフローログがそれぞれ書き込まれていることを確認しておきます。
・/aws/lambda/[cloudwatchlogs->dynamodbにレコード格納するlambda関数名] (lambdaの実行ログ)
・/aws/lambda/[dynamodb->ESドメインにレコード格納するlambda関数名] (lambdaの実行ログ)
・/aws/vpc/[vpcフローログ名] (今回取得したい生ログそのもの)

3. DynamoDBの設定

まずはテーブルのみを作成します。
項目はlambdaからレコードを格納する際に自動生成されるため今回は定義しません。

f:id:tamo_bbbreak:20180803145202p:plain

4. Cognit (ユーザプール/IDプール)の設定

Elasticsearch ServiceにおけるKibanaへのアクセスに、Cognito認証を有効化します。
ESドメインを設定する前に事前定義をしておきます。
ESドメインをパブリックアクセスさせることも可能ですがVPCアクセスのみに限定します。また今回はCognitoのユーザプールに登録したアカウントで認証します。

4-1.ユーザプールの設定

  1. まずはユーザプールを作成。ユーザプールIDとアプリクライアントIDはIDプール作成時に指定します。
    f:id:tamo_bbbreak:20180803201059p:plain
  2. 今回はデフォルト設定で進めます。ただしアプリクライアントの追加は実行します。 f:id:tamo_bbbreak:20180803230509p:plain
  3. アプリクライアントの追加が完了したらユーザプールを作成します。 f:id:tamo_bbbreak:20180803201134p:plain
  4. ユーザプール作成後にドメイン名を設定します。この設定を行なわないとESドメインを作成できません。(VPCに展開する場合) f:id:tamo_bbbreak:20180803201230p:plain 作成後、全般設定からユーザプールID、アプリクライアントからアプリクライアントIDを確認。(IDプール作成時に指定が必要)

4-2.IDプールの設定

  1. 続いてIDプールを作成。IDプール名を指定。 f:id:tamo_bbbreak:20180803202222p:plain
  2. 認証プロバイダーはCognitoを指定します。その際にユーザプールのIDとアプリクライアントIDを指定。
    f:id:tamo_bbbreak:20180803202243p:plain
  3. IDプール作成後、以下2種類の IAMロールが作成されます。それぞれCognitoで認証されたユーザと認証されていないユーザのようです。Cognitoで認証されたユーザは以下のロールの権限でアクセスされるようなので必要に応じて ESドメインへのアクセスポリシーを設定します。
    Cognito[IDプール名]Auth_Role
    Cognito
    [IDプール名]Unauth_Role

Cognito_[IDプール名]Auth_Roleに以下のインラインポリシーを設定。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "es:ESHttpGet",
            "Resource": "arn:aws:es:[[REGION_NAME]]:[[ACCOUNT_ID]]:domain/[[ES_DOMAINNAME]]/*"
        }
    ]
}

5. Elasticsearch Serviceの設定

ドメイン名、Elasticsearchバージョン、インスタンスタイプ、展開先VPC、セキュリティグループやCognito認証などの設定を行います。
ESドメイン作成後インデックスの定義をしておきます。タイプの定義についてはレコード格納と同時に自動定義可能ですが、インデックス定義と合わせてタイプの定義も行います。
(タイプ定義せずにlambdaからDynamoDBの新規レコードをそのまま格納すると、カラムが入れ子構造になったり数値が文字列でタイプが定義されてしまうので、タイプも定義しておきます。)
ESドメインにおけるインデックスとタイプの定義は別記事でまとめたいと思います。

  1. まず ESドメイン名とElasticsearchバージョンを指定。 f:id:tamo_bbbreak:20180803231643p:plain
  2. 続いてリソース設定を実施。インスタンス数/タイプ、ディスクの設定を実施。 f:id:tamo_bbbreak:20180803231714p:plain f:id:tamo_bbbreak:20180803231930p:plain f:id:tamo_bbbreak:20180803231946p:plain
  3. 次にネットワークの設定。展開先のVPC、サブネットとセキュリティグループを指定。 f:id:tamo_bbbreak:20180803232059p:plain
  4. Kibana認証の設定を行います。先ほど作成したCognitoのユーザプールとIDプールを指定。 f:id:tamo_bbbreak:20180803232110p:plain
  5. 最後にESドメインにアクセスできる AWSアカウントorIAMユーザを指定。 f:id:tamo_bbbreak:20180803232123p:plain f:id:tamo_bbbreak:20180803232239p:plain
  6. 最終確認です。ESドメインを作成しませう。 f:id:tamo_bbbreak:20180803232253p:plain

  7. ESドメイン作成後、アプリクライアントのコールバック・サインアウトのURLを設定します。URLはESドメイン全般設定のKibanaのURLを指定。 f:id:tamo_bbbreak:20180804003249p:plain

6. lambdaの設定・コード

今回用意するlambdaのコードは2種類あります。
1つ目はCloudwatch logsからDynamoDBにレコードを格納するコードです。
2つ目はDynamoDBに格納されたレコードをESに格納するコードです。
Cloudwatch logsからDynamoDBにレコードを格納するlambda関数はリージョン毎に作成する必要があります。複数リージョンにVPCを構成する場合、lambda関数もリージョン毎に定義しておきます。
os.getenv関数のところはlambda側で環境変数の定義が必要なのでそれぞれ定義します。(リージョン名、テーブル名などです )

1.Cloudwatch logsからDynamoDBにレコードを格納するコード

import boto3
import base64
import json
import zlib
import os
import json
from datetime import datetime
import re


def lambda_handler(event, context):
    rawdata = event['awslogs']['data']
    data = zlib.decompress(base64.b64decode(rawdata), 16+zlib.MAX_WBITS)
    data_json = json.loads(data)
    if data_json['logGroup']:
        for item in data_json['logEvents']:
            print(item)
            record = match_vpclog(item['id'], item['message'], item['timestamp'])
            #print(record)
            write_dynamo(record)
    return


def write_dynamo(item):
    region_name = os.getenv('REGION_NAME')
    dynamodb = boto3.resource('dynamodb', region_name = region_name)
    table = dynamodb.Table(os.getenv('TABLE_NAME'))
    table.put_item(Item = item)
    return

#log message format '2 unknown eni - - - - - - - number number - NODATA' is unsuppoted
def match_vpclog(id, message, timestamp):
    pattern = r'(?P<version>[^ ]+) (?P<accountid>[^ ]+) (?P<interfaceid>[^ ]+) (?P<srcaddr>[^ ]+) (?P<dstaddr>[^ ]+) (?P<srcport>[^ ]+) (?P<dstport>[^ ]+) (?P<protocol>[^ ]+) (?P<packets>[^ ]+) (?P<bytes>[^ ]+) (?P<start>[^ ]+) (?P<end>[^ ]+) (?P<action>[^ ]+) (?P<logstatus>[^ ]+)'
    match = re.search(pattern, message)
    return {
        'id' : id,
        'version' : int(match.group("version")),
        'account-id' : str((match.group("accountid"))),
        'interface-id' : str(match.group("interfaceid")),
        'srcaddr' : str(match.group("srcaddr")),
        'dstaddr' : str(match.group("dstaddr")),
        'srcport' : int(match.group("srcport")),
        'dstport' : int(match.group("dstport")),
        'protocol' : int(match.group("protocol")),
        'packets' : int(match.group("packets")),
        'bytes' : int(match.group("bytes")),
        'start' : int(match.group("start")),
        'end' : int(match.group("end")),
        'action' : str(match.group("action")),
        'log-status' : str(match.group("logstatus")),
        'timestamp' : timestamp,
        'time' : datetime.fromtimestamp(int(match.group('start'))).strftime('%y-%m-%d %H:%M:%S')
    }

データソースはCloudwatch logsの/aws/vpc/[vpcフローログ名]にします。 lambda関数にREGION_NAME,TABLE_NAMEそれぞれ環境変数を指定。

2.DynamoDBに格納されたレコードをESに格納するコード

import boto3
import requests
from requests_aws4auth import AWS4Auth
from datetime import datetime
import os

region =  os.getenv('REGION_NAME')
service = os.getenv('SERVICE_NAME')
host = os.getenv('HOSTNAME')
index = os.getenv('INDEX_NAME')
type = os.getenv('TYPE_NAME')
url = host + '/' + index + '/' + type + '/'
headers = {"Content-Type" : "application/json"}

credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, 
        service, session_token=credentials.token)

def lambda_handler(event, context):
    for record in event['Records']:
        id = record['dynamodb']['Keys']['id']['S']
        if record['eventName'] != 'REMOVE':
            value = record['dynamodb']['NewImage']
            obj = get_object_fromJson(value)
            insert_record(id, obj)
    return

def insert_record(id, json):
    r = requests.put(url + id, auth = awsauth, json=json, headers = headers)

def get_object_fromJson(data):
    return {
        'id' : str(data['id']['S']),
        'version' : int(data['version']['N']),
        'account-id' : str(data['account-id']['S']),
        'interface-id' : str(data['interface-id']['S']),
        'srcaddr' : str(data['srcaddr']['S']),
        'dstaddr' : str(data['dstaddr']['S']),
        'srcport' : int(data['srcport']['N']),
        'dstport' : int(data['dstport']['N']),
        'protocol' : int(data['protocol']['N']),
        'packets' : int(data['packets']['N']),
        'bytes' : int(data['bytes']['N']),
        'start' : int(data['start']['N']),
        'end' : int(data['end']['N']),
        'action' : str(data['action']['S']),
        'log-status' : str(data['log-status']['S']),
        'timestamp' : int(data['timestamp']['N']),
        'time' : datetime.fromtimestamp(int(data['start']['N']),).strftime("%y-%m-%d %H:%M:%S") 
    }

requests,requests_aws4authがあるのでpipでインストールのうえ、zipで固めておきます。 ESドメインVPC内にあるので、このlambda関数はVPCアクセスできる必要があります。
lambda関数にREGION_NAME,SERVICE_NAME,HOSTNAME,INDEX_NAME,Type_NAMEそれぞれ環境変数を指定。 HOSTNAMEはESドメインVPCエンドポイントのURLです。

7. IAMの設定

今回は以下4つのポリシーを設定したIAMロールを定義。
一番最後に記載していますが一番最初にやることですが・・・。
1. AWSLambdaBasicExecutionRole (AWS管理ポリシー)
2. AWSLambdaVPCAccessExecutionRole (AWS管理ポリシー)
3. DynamoDBテーブルへのCRUD (インラインポリシー)
4. Elasticsearch ServiceドメインへのCRUD (インラインポリシー)
5. Cloudwatch Logs(VPCフローログ)へのCR (インラインポリシー)

3.のインラインポリシーの設定

{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": [
            "dynamodb:BatchGetItem",
            "dynamodb:BatchWriteItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:GetItem",
            "dynamodb:Query",
            "dynamodb:UpdateItem",
            "dynamodb:GetShardIterator",
            "dynamodb:DescribeStream",
            "dynamodb:ListStreams",
            "dynamodb:GetRecords"
        ],
        "Resource": [
            "arn:aws:dynamodb:*:*:table/flowlogtab",
            "arn:aws:dynamodb:*:*:table/flowlogtab/stream/*"
        ]
    }
}

4.のインラインポリシーの設定

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "es:ESHttp*"
            ],
            "Resource": "arn:aws:es:*:*:domain/vpcflowlog-es/*"
        }
    ]
}

5.のインラインポリシーの設定

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:DescribeLogGroups",
                "logs:DescribeLogStreams",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:*:*:log-group:/aws/myvpc/flowlogs:*",
                "arn:aws:logs:*:*:log-group:/aws/myvpc/flowlogs"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "*"
        }
    ]
}

上記とは別に該当ロールに以下の信頼エンティティを設定。
lambda.amazonaws.com
vpc-flow-logs.amazonaws.com
ec2.amazonaws.com






<参考サイト>
VPC フローログ - Amazon Virtual Private Cloud
Amazon Elasticsearch Service ドメインの作成と設定 - Amazon Elasticsearch Service
Amazon Elasticsearch Service にストリーミングデータをロードする - Amazon Elasticsearch Service




誤り漏れや改善点があれば適宜修正していきたいと思います。(lambda関数の設定とコードそのもの、AmazonESのインデックス定義など説明不足のところもあるので・・・。)
にしてもマネジメントコンソールからの設定はわかりやすく簡単だけど記事に投稿する場合は幅取りますね・・・。AWS CLIによる設定方法も確認してみます。
ログ可視化の背景として、AWS VPCとAzureVNETをOpenVPNで拠点間接続してみたのですが、VPCからAzureVNET方向への通信においてICMPパケットの疎通はできたがTCP/UDPパケットが疎通できない状況だったのでまずはパケット疎通状態からの確認と言う事で取り組んでいます。(snatとかの問題なんかなぁ。)
そちらもまとめたいと思います。