SQS + EventBridge Pipes でのインフライトメッセージの削除

この記事は AWS LambdaとServerless Advent Calendar 2022 の24日目の記事です。

EvenBridge Pipes のイベントソースに SQS キューを利用したときのインフライトメッセージの削除処理について試していきます。

インフライトメッセージとは、キューに入っているメッセージをコンシューマーが受信したけど、キューから未削除のメッセージのことです。以下の状態のメッセージです。

はじめに

サービス間を疎結合につなぐ選択肢としてメッセージキューがあります。AWS だと Amazon SQS を選択する方が多いでしょう。

この SQS に入っているメッセージを処理するコンシューマーとしての Lambda が本当に最高です。理由は以下のスライドに書かれている通り、受信処理にまつわる様々な実装を Lambda サービス側にオフロードし、実装者はメッセージを扱うビジネスロジックにフォーカスできるからです。

ただ場合によっては、Lambda 以外のサービスを使いたいこともあります。Step Functions とか ECS タスク(コンテナ実行)はその代表格でしょう。

特に僕は Step Functions が大好きで、様々なシーンで使います。その際 SQS とのマッピング処理のためだけに Lambda を使っていたことがありました。以下のような使い方です。

ただこの使い方だと、Step Functions のステートマシンを async(非同期)で Lambda から実行すると、ステートマシンの成否を待たずして SQS にあるインフライトメッセージが削除され、ステートマシンが異常終了した場合、メッセージは失われてしまいます。

今度は sync(同期) で実行すると、ステートマシンの異常終了時に Lambda で Exception を出せば、メッセージはキューからは削除されません。ただこの場合、Lambda の実行時間が長くなり、コスト面の無駄遣いが気になりました。

Step Functions と SQS がネイティブに統合されればなという夢を長らく抱いていたのは私だけではないでしょう。

そこに re:invent 2022 颯爽と登場したのが EventBridge Pipes でした。EvenBridge Pipes については以前ブログを書きましたのでこちらをご覧いただければとおもいますが、まさにLambda がイベント/メッセージ関連のサービスと統合する際に、Lambda 関数の前段で Lambda サービスが行なってきた処理部分を EventBridge Pipes が担ってくれます。

つまり、SQS - EventBridge Pipes - Step Functions と繋ぐことで、SQS + Lambda のときと同じ恩恵を受けて統合することが可能です。最高です!

メッセージのポーリングのところやメッセージ量に合わせた実行環境のスケールのところは、ドキュメントを見たり、実際にテストもしてみましたが Lambda サービスと変わらないように見えました。

気になるのは、ターゲットとなるサービス部分の呼び出し後のインフライトメッセージの削除処理 (DeleteMessage) です。呼び出しサービスによっても異なる部分はあるはずですが、基本は 処理自体が sync か async かによって決まるのではと予測できます。

この SQS + EventBridge Pipes におけるインフライトメッセージの削除およびエラー時のリトライ処理の制御について検証します。

検証方法

以下のようなシンプルな EventBridge Pipes を作りました。

  • イベントソース: SQS の標準キュー
  • フィルター: N/A
  • Enrich: Step Functions ステートマシン(Pass のみ)
    • リトライの挙動確認用に、特に何の処理もしないステートマシンを挟んでいます
  • ターゲット: Lambda関数/Step Functions ステートマシン

内容/条件

SQS マネージメントコンソールの “Send and receive message” でメッセージを送信します。

そして以下の条件で、ターゲットの正常終了時・異常終了時 EventBridge Pipes がどのタイミングでインフライトメッセージを削除するかを確認します。

Step Functions で invoke タイプ: sync は、STANDARD ワークフローではサポートされていないため、EXPRESS ワークフローを利用します。

  • ターゲット: Lambda 関数
    • Invoke Type: sync
    • Invoke Type: async
  • ターゲット: Step Functions
    • Invoke Type: sync
    • Invoke Type: async

確認方法

SQS の メトリクスのうち、以下のメトリクスをモニタリングすることでメッセージの状況を Watch します。

  • NumberOfMessagesSent: キューに追加されたメッセージ数
  • NumberOfMessagesReceived: ReceiveMessage が実行され返されたメッセージ数
  • NumberOfMessagesDeleted: キューから削除されたメッセージ数
  • ApproximateNumberOfMessagesNotVisible: 現在コンシューマーで処理中のメッセージ数
  • ApproximateNumberOfMessagesVisible: 現在キューから取得可能なメッセージ数

また、こちらに書いた通り、2023 年 1 月 1 日現在 EventBridge Pipes でのデバッグは少し足りていない印象なので、ターゲットや Enrich となる Lambda や Step Functions のロギングを設定するこで、より多くの情報を得るようにしました。 2023-11-16にアップデートがあり、Event Bridge Pipes にデバッグ用のロギング機能が実装されました。今なら上記のようなワークアラウンドは不要です。

各リソース定義

  • 正常終了する Lambda 関数
export const handler = async(event) => {
    const response = {
        statusCode: 200,
        body: event,
    };
    console.log(response);
    return response;
};
  • 異常終了する Lambda 関数
export const handler = async(event) => {
    const response = {
        statusCode: 500,
        body: event,
    };
    console.log(response);
    return x + 10;
};
  • 正常終了するステートマシン
{
  "Comment": "A description of my state machine",
  "StartAt": "Pass",
  "States": {
    "Pass": {
      "Type": "Pass",
      "End": true
    }
  }
}
  • 異常終了するステートマシン
{
  "Comment": "A description of my state machine",
  "StartAt": "DetectSentiment",
  "States": {
    "DetectSentiment": {
      "Type": "Task",
      "End": true,
      "Parameters": {
        "LanguageCode": "Dummy",
        "Text": "Dummy"
      },
      "Resource": "arn:aws:states:::aws-sdk:comprehend:detectSentiment"
    }
  }
}

検証結果

まず結論から、以下表のような挙動をしました。概ね想定通りの動きをしています。

  • 正常終了パターン

    ターゲット SQS DeleteMessage
    Lambda(sync) Lambda 関数終了後に削除
    Lambda(async) Lambda 関数 invoke して削除
    Step Functions(sync) ステートマシン終了後に削除
    Step Functions(async) ステートマシン invoke して削除
  • 異常終了パターン

    ターゲット SQS DeleteMessage
    Lambda(sync) 削除されない。可視性タイムアウト後再度キューから取り出されて処理
    Lambda(async) Lambda 関数 invoke して削除。関数自体は 2 回リトライされた(合計3回実行)
    Step Functions(sync) 削除されない。可視性タイムアウト後再度キューから取り出されて処理
    Step Functions(async) ステートマシン invoke して削除。ステート自体は 2 回リトライされた(合計3回実行)

Step Functions の sync 実行が EXPRESS ワークフローのみでサポートなので Job 実行 (.sync) やコールバック (.waitForTaskToken) 統合パターンなどが利用できないのが注意点ですが、Step Functions と SQS をいい感じに統合できることを確認できました。

また、async で実行に成功した後に Lambda や ステートマシン内でエラーが発生した際も、Pipes から ターゲットサービスへのリクエストが 2 回リトライ(合計3回実行)されました。Enrich 用のステートマシンの実行履歴はなく、あくまでターゲットへの送信処理自体のリトライのようでした。この辺りは un-documented な挙動なので保証はできませんが、新たな発見でした。

他に知っておいた方が良さそうなこと

Partial batch failure のススメ

EventBridge Pipes ではターゲットへのバッチ処理の際に、デフォルトでは完全に成功すればすべてのメッセージが成功したとしてキューから削除、エラーを返すと仮に一部のレコードのみの失敗の場合もすべてのメッセージが失敗したとしてキューに残されるという挙動をします。

基本は at least once を前提にした作りにしておくべきではありますが、どちらにしても不要なリトライやデータの読み出しはコスト的にもやりたくないものです。

そこで、ターゲットからのレスポンスのペイロードに batchItemFailures というパラメータでエラーの発生したアイテムを含めることで、部分的なリトライを行うことが可能です。

SQS の場合、itemIdentifier は messageId です。 Kinesis と DynamoDB Streams の場合、は eventID です。

{
	"batchItemFailures": [
		{
			"itemIdentifier": "id2"
		},
		{
			"itemIdentifier": "id4"
		}
	]
}

大規模にイベントを扱うアプリケーションを想定するとバッチ処理は必須になっていきます。その際にこちらの機能は是非利用すると良いでしょう、

参考ドキュメントはこちらです。

デッドレターキューのススメ

リトライをしたとしても、そもそもデータが良くない(想定外のもの)とかだと、永遠にリトライを繰り返してしまいます。

こういうことを想定して、イベントソースである SQS 側に デッドレターキュー(配信不能キュー)を設定しておきましょう。

ただ、あくまでリトライの結果「配信不能」と判断されたものをデッドレターキューに送信したいものです。リドライブポリシーで maxReceiveCount を十分なリトライがされてから送信されるようにコントロールしておきましょう。

おまけ

EventBridge Pipes では、イベントソースにもターゲットにもできるサービスがあります。SQS はその一つです。「もしかして新しいサーバーレス無限ループになりうるのでは?」と思い、イベントソースとターゲットに同じ SQS のキューを指定できるか試してみました。

結果無理でした。ちゃんとバリデーションされており、以下のようなエラーになりました。

ValidationException: 
  Source: Creating a pipe with the same source and target resource is not permitted.
  Target: Creating a pipe with the same source and target resource is not permitted.

まぁ、特にやる意味もないですしね。