EventBridge Pipes のソースとしての Amazon MQ メッセージブローカー。 - Amazon EventBridge

EventBridge Pipes のソースとしての Amazon MQ メッセージブローカー。

EventBridge Pipes を使用して、Amazon MQ メッセージブローカーからレコードを受け取ることができます。その後、オプションでこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先のいずれかに送信できます。Amazon MQ に固有の設定があり、パイプをセットアップするときに選択できます。EventBridge Pipes は、データを送信先に送信するときに、メッセージブローカーのレコードの順序を維持します。

Amazon MQ は、Apache ActiveMQ および RabbitMQ 用のマネージドメッセージブローカーサービスです。メッセージブローカーを使用すると、ソフトウェアアプリケーションおよびコンポーネントは、さまざまなプログラミング言語、オペレーティングシステム、および、トピックまたはキューをイベント送信先とする正式なメッセージングプロトコルを使って、通信できるようになります。

また Amazon MQ は、ActiveMQ か RabbitMQ ブローカーをインストールすることにより、ユーザーに代わって Amazon Elastic Compute Cloud (Amazon EC2)インスタンスを管理することもできます。ブローカーをインストールすると、さまざまなネットワークトポロジやその他のインフラストラクチャのニーズがインスタンスに提供されます。

Amazon MQ ソースには、次の設定制限があります。

  • クロスアカウント – EventBridge はクロスアカウント処理をサポートしていません。EventBridge を使用して、別の AWS アカウントにある Amazon MQ メッセージブローカーからのレコードを処理することはできません。

  • 認証 - ActiveMQ では、ActiveMQ SimpleAuthenticationPlugin のみサポートされています。RabbitMQ の場合、PLAIN 認証メカニズムのみサポートされています。認証情報を管理するには、AWS Secrets Manager を使用してください。ActiveMQ 認証の詳細については、「Amazon MQ デベロッパーガイド」の「 Integrating ActiveMQ brokers with LDAP」を参照してください。

  • 接続クォータ - ブローカーは、ワイヤレベルプロトコルごとに最大の接続可能数を持っています。このクォータは、ブローカーインスタンスタイプに基づいています。これらの制限の詳細については、「Amazon MQ デベロッパーガイド」の「*Amazon MQ のクォータ」 の「ブローカー」のセクションを参照してください。

  • 接続 - ブローカーをパブリックまたはプライベートの Virtual Private Cloud (VPC) に作成できます。プライベート VPC の場合、 パイプが VPC にアクセスしてメッセージを受信する必要があります。

  • イベント送信先 - キューの送信先のみがサポートされます。ただし、仮想トピックを使用することができます。仮想トピックは、パイプと対話するとき、内部的にトピックとして、かつ外部的にキューとして動作します。詳細については、Apache ActiveMQ ウェブサイトの Virtual Destinations および RabbitMQ ウェブサイトの Virtual Hostsを参照してください。

  • ネットワークトポロジ - ActiveMQ の場合、パイプに対して、1つの単一インスタンスまたはスタンバイブローカーがサポートされます。RabbitMQ の場合、パイプごとに、単一インスタンスブローカーまたはクラスターデプロイメントがサポートされます。単一インスタンスブローカーには、フェイルオーバーエンドポイントが必要です。これらのブローカーデプロイメントモードの詳細については、「Amazon MQ デベロッパーガイド」の「Active MQ ブローカーアーキテクチャ」および「Rabbit MQ ブローカーアーキテクチャ」を参照してください。

  • プロトコル — サポートされるプロトコルは、使用する Amazon MQ の統合のタイプによって異なります。

    • ActiveMQ 統合の場合、EventBridge は OpenWire/Java Message Service (JMS) プロトコルを使用してメッセージを使用します。メッセージの使用は、他のプロトコルではサポートされていません。EventBridge は JMS プロトコル内の TextMessage 操作と BytesMessage 操作のみをサポートします。OpenWire プロトコルの詳細については、Apache ActiveMQ ウェブサイトの OpenWire を参照してください。

    • RabbitMQ 統合の場合、EventBridge は AMQP 0-9-1 プロトコルを使ってメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。RabbitMQ による AMQP 0-9-1 プロトコルの実装の詳細については、RabbitMQ ウェブサイトの AMQP 0-9-1 Complete Reference Guide を参照してください。

EventBridge は、Amazon MQ がサポートする ActiveMQ および RabbitMQ の最新バージョンを自動的にサポートします。サポートされている最新バージョンについては、「Amazon MQ デベロッパーガイド」の「Amazon MQ リリースノート」を参照してください。

注記

デフォルトでは、Amazon MQ には毎週、ブローカー用のメンテナンスウィンドウがあります。その期間中、ブローカーは利用できません。スタンバイしていないブローカーの場合、EventBridge はウィンドウが終了するまでメッセージを処理しません。

イベントの例

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes フィルタリング」を参照してください。

ActiveMQ

[ { "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "data": "QUJDOkFBQUE=", "connectionId": "myJMSCoID", "redelivered": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 }, { "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "data": "3DTOOW7crj51prgVLQaGQ82S48k=", "connectionId": "myJMSCoID1", "persistent": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 } ]

RabbitMQ

[ { "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "eventSourceKey": "pizzaQueue::/", "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ]

コンシューマーグループ

Amazon MQ と対話するため、EventBridge は、Amazon MQ ブローカーから読み取ることができるコンシューマーグループを作成します。コンシューマーグループは、パイプ UUID と同じ ID で作成されます。

Amazon MQ トソースの場合、EventBridge はレコードをまとめてバッチ処理し、それらを単一のペイロードで関数に送信します。動作を制御するには、バッチ処理ウィンドウとバッチサイズを設定できます。EventBridge は、以下のいずれかが発生すると、メッセージをプルします。

  • 処理されたレコードのペイロードサイズは最大 6 MB に達します。

  • バッチウィンドウの有効期限が切れる。

  • レコード数がバッチサイズ全体に達します。

EventBridge はバッチを単一のペイロードに変換してから、関数を呼び出します。メッセージは永続化も逆シリアル化もされません。その代わりに、コンシューマーグループはそれらをバイトの BLOB として取得します。次に、base64 でエンコードして JSON ペイロードにします。パイプがバッチ内のいずれかのメッセージに対してエラーを返すとEventBridge は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

ネットワーク構成

デフォルトではAmazon MQ、 ブローカーは PubliclyAccessible フラグを false に設定して作成されます。ブローカーにパブリック IP アドレスが与えられるのは、PubliclyAccessible が true に設定されている場合のみです。パイプでフルアクセスする場合、ブローカーはパブリックエンドポイントを使用するか、VPC へアクセスを提供する必要があります。

Amazon MQ ブローカーがパブリックにアクセス可能ではない場合、EventBridge は、Amazon MSK クラスターに関連付けられている Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。

  • Amazon MQ ブローカーの VPC にアクセスするため、EventBridge はソースのサブネットへのインターネットアクセス·(アウトバウンド)·を使用できます。パブリックサブネットの場合、これはマネージド NAT ゲートウェイである必要があります。プライベートサブネットの場合は NAT ゲートウェイでも、独自の NAT でもかまいません。NAT にパブリック IP アドレスが割り当てられ、インターネットに接続できることを確認します。

  • EventBridge Pipes は AWS PrivateLink を介したイベント配信もサポートしているため、パブリックインターネットを経由することなく、Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースからパイプターゲットにイベントを送信できます。パイプを使用して、インターネットゲートウェイのデプロイ、ファイアウォールルールの設定、プロキシサーバーの設定を必要とせずに、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド型 Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。

    VPC エンドポイントを設定するには、「AWS PrivateLink ユーザーガイド」の「VPC エンドポイントの作成」を参照してください。サービス名では、com.amazonaws.region.pipes-data を選択します。

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。

  • インバウンドルール – ソース用に指定されたセキュリティグループに対して Amazon MQ ブローカーポート上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソース用に指定されたセキュリティグループに対して Amazon MQ ブローカーポート上のすべてのトラフィックを許可します。

    ブローカーポートには以下が含まれます。

    • プレーンテキスト用 9092

    • TLS 用 9094

    • SASL 用 9096

    • IAM 用 9098

注記

Amazon VPC の設定は、Amazon MQ API を使用して検出できます。セットアップ中に設定する必要はありません。