はじめに

Aurora PostgreSQL で長時間ロック(ブロッキング)が発生した際に、自動でアラートを飛ばしたくなりました。
5 分ごとにブロッキング中のセッションがあれば SNS 経由で通知するフローを構築したので、設定手順をまとめます。

概要

  • EventBridge で 5 分間隔のスケジュールを作成
  • Secrets Manager に RDS 接続情報を保存
  • notification Lambda (<LAMBDA_NOTIFY_NAME>) の実装

EventBridge で 5 分間隔のスケジュールを作成

  1. コンソールEventBridgeルール「ルールの作成」
  2. 基本設定
    ・名前:<SCHEDULE_RULE_NAME>
    ・説明:Run <LAMBDA_MAIN_NAME> every 5 minutes
    ・ステータス:有効
  3. スケジュール式
    ・タイプ:スケジュール
    ・式:rate(5 minutes)
  4. ターゲット
    ・ターゲットタイプ:Lambda 関数
    ・関数名:<LAMBDA_MAIN_NAME>
    ・入力:固定 JSON → {}
  5. Flexible time window:Off
  6. 保存

CLI例:

aws events put-rule \
  --name <SCHEDULE_RULE_NAME> \
  --schedule-expression "rate(5 minutes)" \
  --state ENABLED

aws events put-targets \
  --rule <SCHEDULE_RULE_NAME> \
  --targets Id=1,Arn=$(aws lambda get-function --function-name <LAMBDA_MAIN_NAME> \
  --query 'Configuration.FunctionArn' --output text),Input="{}"

aws lambda add-permission \
  --function-name <LAMBDA_MAIN_NAME> \
  --statement-id EventBridgeInvoke \
  --action lambda:InvokeFunction \
  --principal events.amazonaws.com \
  --source-arn "arn:aws:events<AWS_REGION>:<ACCOUNT_ID>:rule/<SCHEDULE_RULE_NAME>"/

Secrets Manager に RDS 接続情報を保存

  • シークレット名:<SECRET_NAME>
  • シークレット文字列(JSON)例:
{
  "host":     "<RDS_ENDPOINT>",
  "port":     "5432",
  "dbname":   "<DB_NAME>",
  "username": "<DB_USER>",
  "password": "<DB_PASSWORD>"
}

main Lambda (<LAMBDA_MAIN_NAME>) の実装

1. pg8000 Layer の作成

mkdir pg8000-layer && cd pg8000-layer
mkdir python
python3 -m pip install pg8000 -t python/
zip -r pg8000-layer.zip python
aws lambda publish-layer-version \
  --layer-name pg8000 \
  --description "pg8000 for Python3.9" \
  --zip-file fileb://pg8000-layer.zip \
  --compatible-runtimes python3.9 \
  --region <AWS_REGION>

2.IAM ロール (<LAMBDA_MAIN_ROLE>) にポリシーを付与

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect":"Allow",
      "Action":[
        "secretsmanager:GetSecretValue",
        "kms:Decrypt"
      ],
      "Resource":[
        "arn:aws:secretsmanager:<AWS_REGION>:<ACCOUNT_ID>:secret:<SECRET_NAME>*",
        "<KMS_KEY_ARN>"
      ]
    },
    {
      "Effect":"Allow",
      "Action":["lambda:InvokeFunction"],
      "Resource":"arn:aws:lambda:<AWS_REGION>:<ACCOUNT_ID>:function:<LAMBDA_NOTIFY_NAME>"
    }
  ]
}

3.VPC 設定 & VPC エンドポイント

  • VPC<VPC_ID>
  • サブネット<SUBNET_ID_1>, <SUBNET_ID_2>
  • セキュリティグループ<SECURITY_GROUP_ID>
  • Endpoints(Interface)
    • Secrets Manager:com.amazonaws.<AWS_REGION>.secretsmanager
    • Lambda API:com.amazonaws.<AWS_REGION>.lambda

4. Lambda 環境変数

KeyValue
SECRET_ARNarn:aws:secretsmanager:<AWS_REGION>:<ACCOUNT_ID>:secret:<SECRET_NAME>
NOTIFY_FUNCTION_NAME<LAMBDA_NOTIFY_NAME>

5. コード全文

import os, json, time, boto3, pg8000.native as pg

sm = boto3.client('secretsmanager')
lambda_client = boto3.client('lambda')

def lambda_handler(event, context):
    start = time.time()
    # 1) シークレット読み込み
    sec = sm.get_secret_value(SecretId=os.environ['SECRET_ARN'])
    creds = json.loads(sec['SecretString'])

    # 2) DB 接続
    conn = pg.Connection(
        host=creds['host'], port=int(creds['port']),
        database=creds['dbname'],
        user=creds['username'], password=creds['password']
    )

    # 3) ブロッキング検知クエリ
    rows = conn.run("""
      SELECT pid,
             pg_blocking_pids(pid)::text AS blocking_pids,
             REGEXP_REPLACE(query,'\r|\n|\r\n',' ','g') AS query,
             query_start
      FROM pg_stat_activity
      WHERE array_length(pg_blocking_pids(pid),1) > 0
      ORDER BY array_length(pg_blocking_pids(pid),1) DESC, query_start;
    """)
    conn.close()

    # 4) 結果整形
    items  = [[pid, bl, q, dt.isoformat() if dt else None] for pid,bl,q,dt in rows]
    blocked = len(items)

    # 5) 通知分岐
    if blocked > 0:
        lambda_client.invoke(
            FunctionName=os.environ['NOTIFY_FUNCTION_NAME'],
            InvocationType='Event',
            Payload=json.dumps({'blocked_count':blocked,'items':items})
        )

    return {"statusCode":200,"body":{"blocked_count":blocked,"items":items}}

notification Lambda (<LAMBDA_NOTIFY_NAME>) の実装

1. SNS トピック作成

  • 名前:<SNS_TOPIC_NAME>
  • Display name:RDS Blocking Alert
  • サブスクリプション:Email, Slack など

2. IAM ロール (<LAMBDA_NOTIFY_ROLE>) の権限

{
  "Version":"2012-10-17",
  "Statement":[
    {
      "Effect":"Allow",
      "Action":["sns:Publish"],
      "Resource":"arn:aws:sns:<AWS_REGION>:<ACCOUNT_ID>:<SNS_TOPIC_NAME>"
    }
  ]
}

3. Lambda 環境変数

KeyValue
SNS_TOPIC_ARNarn:aws:sns:<AWS_REGION>:<ACCOUNT_ID>:<SNS_TOPIC_NAME>

4. コード全文

import os, json, boto3

sns = boto3.client('sns')

def lambda_handler(event, context):
    blocked = event.get('blocked_count', 0)
    items   = event.get('items', [])
    if blocked > 0:
        msg = f"【警告】RDS ブロッキング検出:{blocked} 件\n\n"
        for pid, bl, q, start in items:
            msg += f"pid={pid}, blocking={bl}, start={start}\n{q}\n\n"
        sns.publish(
            TopicArn=os.environ['SNS_TOPIC_ARN'],
            Subject="RDS Blocking Alert",
            Message=msg
        )
    return {"statusCode":200,"body":"Done"}

動作確認

  1. EventBridge → 5 分後に main Lambda が起動
  2. CloudWatch Logs/aws/lambda/<LAMBDA_MAIN_NAME> に定期ログ
  3. ブロッキング発生時 → notification Lambda → SNS 通知が届く

まとめ

以上で、5 分ごとに PostgreSQL のブロッキングを自動検知してアラートを飛ばす仕組みが完成します。
ぜひ試してみてください!