こんにちは( ̄^ ̄)ゞ
対人戦に疲れ、サーモンランばかりやっている大久保です。
シェケナダムで伝説200を目指してます。
今回は「~パート2~データベースアクティビティストリームの設定方法」になります。
- ~パート1~データベースアクティビティストリームとは
- ~パート2~データベースアクティビティストリームの設定方法
- ~パート3~Athenaの設定方法
の3つにわけてあります。
目次
今回やることの概要
まず処理の全体像から見ていきましょう。
処理フロー図
処理フロー
- Auroraから非同期でログをKinesis Data Streamsへストリーミング
- KMSを使ってログを暗号化
- Kinesis Data StreamsからKinesis Data Firehoseへストリーミング
- Kinesis Data FirehoseからS3へログを保存
- Lambdaで利用してログを複合化、整形
- GZIPに圧縮
- Athenaを利用して保存されたログをオンデマンドで検索
実際のLambdaの復号化ロジックの部分はわかりづらいですが、全体のフローとしてはシンプルでわかりやすいかと思います。
設定方法
それではDASの設定手順について説明していきます。
ここからの作業は、以下が前提になります。
- Administrator権限を持っているIAMユーザーでログインしている。
- VPC、サブネット、Aurora等は事前に作成してある。
作業は、
- KMSでカスタマ管理キーの作成
- 復号化Lambdaの作成
- KMSのインターフェイス VPC エンドポイントの作成
- DASの有効化
- Kinesis Data Streamsの設定変更
- Kinesis Data Firehoseの作成
という流れでやっていきます。
KMSでカスタマ管理キーの作成
DASを暗号化、復号化する際に利用するKMSのキー作成を行います。
- AWSのマネジメントコンソール画面から、KMSのサービスの画面にアクセスします。
- 左側のペインから「カスタマー管理型のキー」をクリックし、「キーの作成」をクリックします。
- 以下の設定を行い、「次へ」をクリックします。
- キーのタイプ
- 対象
- キーの使用
- 暗号化および復号化
- 詳細オプション
- キーマテリアルオリジン
- KMS
- リージョンごと
- 単一リージョンキー
- キーマテリアルオリジン
- キーのタイプ
- 以下の設定を行い、「次へ」をクリックします。
- エイリアス
- 任意の名前
- 説明
- 任意の説明
- タグ
- 任意のタグ
- エイリアス
- 以下の2つはの画面はそのまま「次へ」をクリックします。
- 最後に設定した内容を確認し、「完了」をクリックして作成完了です。
復号化Lambdaの作成
続いてLambdaの作成です。
Lambdaはゼロから書くのでなく、公式で出ているサンプルを利用して作成を行います。
サンプルのまま利用しても問題なく復号化されますが、Athenaで検索しづらいフォーマットなため、一部修正を行います。
Filter Amazon Aurora database activity stream data for segregation and monitoring
https://github.com/aws-samples/aurora-das-processing/blob/main/lambda_function.py
また「aws_encryption_sdk」というライブラリがLambdaにデフォルトで入っていないため、ローカル or EC2上などでパッケージ化したzipファイルを作成する必要があります。
というわけで、編集したサンプルコードは以下になります。
#This Lambda function reads the Kinesis Data Firehose records as Input, decrypt the log records using KMS key, unzip the records and then categories the event type into S3 folder structure.
from __future__ import print_function
import json
import boto3
import base64
import zlib
import os
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import datetime
#REGION_NAME = os.environ['REGION_NAME'] # 'us-east-1'
RESOURCE_ID = os.environ['RESOURCE_ID'] #'cluster-ABCD1234'
#BUCKET_NAME = os.environ['bucket_name'] # 'dastestbucket'
enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT)
#kms = boto3.client('kms', region_name=REGION_NAME)
kms = boto3.client('kms')
s3 = boto3.client('s3')
todays_date = datetime.datetime.now()
class MyRawMasterKeyProvider(RawMasterKeyProvider):
provider_id = "BC"
def __new__(cls, *args, **kwargs):
obj = super(RawMasterKeyProvider, cls).__new__(cls)
return obj
def __init__(self, plain_key):
RawMasterKeyProvider.__init__(self)
self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)
def _get_raw_key(self, key_id):
return self.wrapping_key
def decrypt_payload(payload, data_key):
my_key_provider = MyRawMasterKeyProvider(data_key)
my_key_provider.add_master_key("DataKey")
#Decrypt the records using the master key.
decrypted_plaintext, header = enc_client.decrypt(
source=payload,
materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
return decrypted_plaintext
def decrypt_decompress(payload, key):
decrypted = decrypt_payload(payload, key)
#Decompress the records using zlib library.
decrypted = zlib.decompress(decrypted, zlib.MAX_WBITS + 16)
return decrypted
#Lambda Handler entry point
def lambda_handler(event, context):
output = []
#print("Received event: " + json.dumps(event, indent=2))
for dasRecord in event['records']:
recID = dasRecord['recordId']
data = base64.b64decode(dasRecord['data'])
# Do processing here
val_raw = processDASRecord(recID,data)
val_1 = json.dumps(val_raw)
val_2 = val_1.lstrip("[")
val_3 = val_2.replace('}]','}\n')
val_4 = val_3.replace('}, ','}\n')
#Record count has to match when we return to Firehose. If we don’t want certain records to reach destination – result should be equal to Dropped.
if len(val_raw)>0:
output_record = {
'recordId': dasRecord['recordId'],
'result': 'Ok',
'data': base64.b64encode(val_4.encode("utf-8"))
}
else:
output_record = {
'recordId': dasRecord['recordId'],
'result': 'Dropped',
'data': base64.b64encode(b'this is a dropped event')
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
def processDASRecord(rID, rec):
record = json.loads(rec)
if record['type'] == 'DatabaseActivityMonitoringRecords':
dbEvents = record["databaseActivityEvents"]
dataKey = base64.b64decode(record["key"])
try:
#Decrypt the envelope master key using KMS
data_key_decrypt_result = kms.decrypt(CiphertextBlob=dataKey, EncryptionContext={'aws:rds:dbc-id':RESOURCE_ID})
except Exception as e:
print(e)
raise e
try:
plaintextEvents = decrypt_decompress(base64.b64decode(dbEvents), data_key_decrypt_result['Plaintext'])
except Exception as e:
print(e)
raise e
retObj = []
#parse thru all activity and categorize it.
try:
events = json.loads(plaintextEvents)
for dbEvent in events['databaseActivityEventList']:
#filter out events which you don't want to log.
if dbEvent['type']== "heartbeat": #or eventType == "READ":
#print ("Heart beat event - ignored event, dropping it.")
continue
if not (dbEvent.get('command') is None):
eventType = dbEvent['command']
#use this section to log all events in separate S3 folder.
#parse and write individual type of events to separate S3 folders.
#s3suffix = '/' + str(todays_date.year) + '/' + str(todays_date.month) + '/' + str(todays_date.day) + '/' + rID + '.txt'
#s3.put_object(Body=json.dumps(dbEvent, ensure_ascii=False), Bucket=BUCKET_NAME, Key = 'parsed/'+ eventType + s3suffix )
retObj.append(dbEvent)
except Exception as e:
print (e)
raise e
return retObj
- 編集した箇所
- 7行目
- os importを追加
- 15、17行目
- コメントアウト
- 16行目
- クラスターリソースIDを環境変数から取得するように変更
- 20行目、21行目
- 20行目をコメントアウトして、21 行目を追加
- 55行目
- コメントアウト
- 61~64行目
- str形に変更して、ログを整形
- 108行目
- コメントアウト
- 115、116行目
- コメントアウト
- 7行目
上記の通りで、大した変更はしてません。
61行目から64行目の変更は、ちょっと強引ではありますが、各ログファイルが以下のフォーマットでS3に保存されるようにしてます。
こうすることでAthenaで分析しやすくなります。
[{"logTime":~~~}]
[{"logTime":~~~},{"logTime":~~~}]
[{"logTime":~~~}]
[{"logTime":~~~},{"logTime":~~~}]
{"logTime":~~~}
{"logTime":~~~}
{"logTime":~~~}
{"logTime":~~~}
{"logTime":~~~}
{"logTime":~~~}
zipファイルの作成
まずLambda用のzipファイルを作成します、ローカルでもEC2でも良いので以下のような手順で行います。
- pythonのバージョンは、作成予定のLambdaと合わせるようにしてください。
- 作成したzipファイルはローカル or S3に保存しておきます。
# ディレクトリの作成移動
mkdir das-lambda
cd das-lambda
# zipファイルの作成
pip3 install aws_encryption_sdk -t ./
vi lambda_function.py # 復号化スクリプトをコピペ
zip das-lambda -r ./*
IAMポリシーの作成
- 以下のような、KMSのアクセス許可のポリシーを作成します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kms:Decrypt",
"Resource": "<作成したKMSキーのARN>"
}
]
}
IAMロールの作成
- 以下のポリシーをアタッチしてLambda用のIAMロールを作成します。
- 作成したIAMポリシー
- AWSLambdaBasicExecutionRole ※AWS管理ポリシー
Lambdaの作成
- 以下の設定でLambdaを作成します。
- 関数名
- 任意の名前
- ランタイム
- python3.8 ※zipファイルを作成した環境のpythonのバージョンに合わせてください。
- アーキテクチャ
- x86_64
- 実行ロール
- 既存のロールを使用する
- 作成したIAMロール
- 関数名
- 作成後、以下の設定を変更します。
- 一般設定
- タイムアウトを1分
- 環境変数
- キー:RESOURCE_ID
- 値:対象のAuroraクラスターのリソースID
- 一般設定
- 最後に、事前に作成しておいたzipファイルをアップロードします。
KMSのインターフェイス VPC エンドポイントの作成
続いて、KMSへアクセスするためのVPCエンドポイントを、Auroraが配置されているサブネットに作成します。
基本的に、Auroraはプライベートサブネットに配置することが多いと思っています。
この場合、NATゲートウェイやVPCエンドポイント等を利用しないとVPC外にあるKMSへアクセスが出来ず、ログの暗号化でエラーを起こしてしまいます。
今回はVPCエンドポイントを作成する方法で進めていきたいと思います。
※パブリックサブネットにAuroraを配置する場合は、NATゲートウェイとVPCエンドポイントは不要です。
セキュリティグループの作成
インタフェース型のVPCエンドポイントの実態は、ENIです。
そのため作成時にはセキュリティグループが必要になりますので、作成を行います。
- 以下の設定で、セキュリティグループを作成します。
- セキュリティグループ名
- 任意の名前
- 説明
- 任意の説明
- VPC
- Auroraが配置されているVPC
- インバウンドルール
- タイプ
- カスタムTCP
- ポート範囲
- 443
- ソース
- カスタム
- 選択したVPCのCIDR
- 説明
- 任意の説明
- タイプ
- セキュリティグループ名
エンドポイントの作成
- 以下の設定でエンドポイント作成します。
- 名前タグ
- 任意の名前
- サービスカテゴリ
- AWSのサービス
- サービス
- com.amazonaws.<リージョン名>.kms
- VPC
- Auroraが配置されているサブネット
- サブネット
- サブネットグループに指定しているサブネット
- セキュリティグループ
- 作成したセキュリティグループ
- ポリシー
- フルアクセス
- 名前タグ
DASの有効化
DASの有効化を行います。
有効化する際、Auroraクラスターの再起動が走るので、本番環境で行う際は注意してください。
- 以下の設定で、DASの有効化を行います。
- マスターキー
- 作成したKMSキー
- Database activity stream mode
- Asynchronous
- すぐに適用
- すぐに適用
- マスターキー
設定が完了するまで、しばらく待ちます。
Kinesis Data Streamsの設定変更
次にData Streamsの設定変更を行います。
DASを有効化することで、Data Streamsが自動的に作成されます。
この作業は必須ではないので、任意で行ってください。
- Kinesisのコンソール画面にアクセスし、左ペインより「データストリーム」をクリックします。
- 作成されたストリーム名をクリックします。
- 「設定」タブをクリックし、「容量モードを編集」をクリックします。
- 容量モードを「オンデマンド」に変更し、「変更を保存」をクリックします。
Kinesis Data Firehoseの作成
最後に、ログの復号化とS3へ保存行うためにFirehoseの作成を行います。
- Kinesisのコンソール画面にアクセスし、左ペインより「配信ストリーム」をクリックします。
- 「配信ストリームの作成」をクリックします。
- 以下の設定で、配信ストリームを作成します。
- ソース
- Amazon Kinesis Data Streams
- 送信先
- Amazon S3
- ソースの設定
- DASの有効化で作成されたData Sreams
- 配信ストリーム名
- 任意の名前
- AWS Lambdaでソースレコードを変換
- データ変換を有効にする
- チェックを入れる
- AWS Lambda関数
- 作成したLambda関数を指定
- バッファサイズ
- 0.2
- バッファ期間
- 60
- データ変換を有効にする
- 送信先の設定
- S3バケット
- ログを保存したいバケットを指定
- 動的パーティショニング
- 有効ではありません
- S3バケットプレフィックス
- 任意の値
- S3バケットエラー出力プレフィックス
- 任意の値
- バッファサイズ
- 5
- バッファ期間
- 300
- データレコードの圧縮
- GZIP
- データレコードの暗号化
- 有効ではありません
- S3バケット
- バックアップの設定
- 有効ではありません
- 詳細設定
- Amazon CloudWatch エラーのログ記録
- 有効
- 許可
- IAMロールを作成または更新
- タグ
- 任意のタグ
- Amazon CloudWatch エラーのログ記録
- ソース
Lambdaの設定の部分で、バッファサイズを0.2に変更するのが肝です。
ログの量が多くなってくると、復号化した際のデータ量が大きくなりLambdaの呼び出しペイロードの制限に引っかかりエラーになります。
Lambda 同期呼び出しモードには、リクエストとレスポンスの両方について、ペイロードサイズに 6 MB の制限があります。関数にリクエストを送信するためのバッファサイズが 6 MB 以下であることを確認してください。また、関数より返るレスポンスが 6 MB を超えないことを確認します。
https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/data-transformation.html
結果
では、実際にログが想定通りに保存されるか検証をしてみます。
EC2から以下のようなSQLを、対象のAuroraに対して実行してみます。
select * from test;
実行してから少し待ち、保存先のS3を確認すると。。。
ちゃんと保存されてました!
ログの中身も確認します。
{
"logTime": "2022-11-24 14:08:19.699173+00",
"statementId": 11,
"substatementId": 1,
"objectType": "TABLE",
"command": "SELECT",
"objectName": "public.test",
"databaseName": "shohei",
"dbUserName": "postgres",
"remoteHost": "10.0.10.90",
"remotePort": "41434",
"sessionId": "637f798c.53d",
"rowCount": 2,
"commandText": "select * from test;",
"paramList": [],
"pid": 1341,
"clientApplication": "psql",
"exitCode": null,
"class": "READ",
"serverVersion": "14.5.0",
"serverType": "PostgreSQL",
"serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
"serverHost": "10.0.155.65",
"netProtocol": "TCP",
"dbProtocol": "Postgres 3.0",
"type": "record",
"startTime": "2022-11-24 14:08:19.699167+00",
"errorMessage": null
}
{
"logTime": "2022-11-24 14:08:23.683316+00",
"statementId": 24,
"substatementId": 1,
"objectType": "TABLE",
"command": "SELECT",
"objectName": "public.test",
"databaseName": "shohei",
"dbUserName": "postgres",
"remoteHost": "10.0.10.90",
"remotePort": "41434",
"sessionId": "637f798c.53d",
"rowCount": 2,
"commandText": "select * from test;",
"paramList": [],
"pid": 1341,
"clientApplication": "psql",
"exitCode": null,
"class": "READ",
"serverVersion": "14.5.0",
"serverType": "PostgreSQL",
"serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
"serverHost": "10.0.155.65",
"netProtocol": "TCP",
"dbProtocol": "Postgres 3.0",
"type": "record",
"startTime": "2022-11-24 14:08:23.68331+00",
"errorMessage": null
}
{
"logTime": "2022-11-24 14:08:23.864478+00",
"statementId": 25,
"substatementId": 1,
"objectType": "TABLE",
"command": "SELECT",
"objectName": "public.test",
"databaseName": "shohei",
"dbUserName": "postgres",
"remoteHost": "10.0.10.90",
"remotePort": "41434",
"sessionId": "637f798c.53d",
"rowCount": 2,
"commandText": "select * from test;",
"paramList": [],
"pid": 1341,
"clientApplication": "psql",
"exitCode": null,
"class": "READ",
"serverVersion": "14.5.0",
"serverType": "PostgreSQL",
"serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
"serverHost": "10.0.155.65",
"netProtocol": "TCP",
"dbProtocol": "Postgres 3.0",
"type": "record",
"startTime": "2022-11-24 14:08:23.864472+00",
"errorMessage": null
}
ちゃんと、想定通りのフォーマットで保存されていました!
まとめ
今回はDASの設定方法について説明しました。
Lambdaのzipファイルさえ作れれば、あとは手順に従って操作するだけなので簡単かと思います。
次回は、最後のAthenaでの検索について説明したいと思います。