次の方法で共有


Azure Cosmos DB Spark コネクタ: スループットの制御

適用対象: NoSQL

Spark コネクタを使うと、Apache Spark を使って Azure Cosmos DB と通信できます。 この記事では、スループット制御機能のしくみについて説明します。 スループット制御の使用を開始するには、GitHub の Spark サンプルを確認してください。

この記事では、Azure Cosmos DB Spark コネクタでのグローバル スループット制御グループの使用について説明しますが、この機能は Java SDK でも使用できます。 SDK では、グローバルとローカルのスループット制御グループを使って、単一クライアント接続インスタンスのコンテキストで要求ユニット (RU) の消費量を制限できます。 たとえば、このアプローチを単一のマイクロサービス内のさまざまな操作に適用したり、単一のデータ読み込みプログラムに適用したりできます。 詳しくは、Java SDK でスループット制御を使用する方法についての記事をご覧ください。

警告

スループット制御は、ゲートウェイ モードではサポートされていません。 現在、サーバーレス Azure Cosmos DB アカウントtargetThroughputThreshold を使ってパーセンテージを定義しようとすると、エラーが発生します。 spark.cosmos.throughputControl.targetThroughput を使うと、ターゲット スループット/RU の絶対値のみを指定できます。

スループット制御が重要なのはなぜですか?

スループット制御は、コンテナーに対して実行されるアプリケーションのパフォーマンス ニーズを分離するのに役立ちます。 スループット制御は、特定の Spark クライアントが消費できる RU の量を制限します。

クライアント側スループット制御から恩恵を受ける高度なシナリオがいくつかあります。

  • 操作やタスクによって優先順位が異なる: データ インジェストやコピー アクティビティのため、通常のトランザクションが抑えられないようにする必要があります。 一部の操作やタスクは待ち時間の影響を受けにくく、他のものよりも抑えられることに高い耐性を持ちます。
  • 異なるユーザーまたはテナントに公平性と分離性を提供する: 通常、アプリケーションには多くのユーザーがいます。 一部のユーザーが送信する要求が多すぎると、使用できるすべてのスループットを消費し、他のユーザーが抑えられる可能性があります。
  • 異なる Azure Cosmos DB クライアント間のスループットの負荷分散: 一部のユース ケースでは、すべてのクライアントにスループットが公平に (等しく) 割り当てられることが重要です。

スループット制御により、必要に応じて、より詳細なレベルの RU レート制限を実現できます。

スループット制御のしくみ

Spark コネクタのスループット制御を構成するには、最初に、スループット制御メタデータを定義するコンテナーを作成します。 パーティション キーは groupId で、ttl を有効にします。 ここでは、Spark SQL を使ってこのコンテナーを作成し、ThroughputControl という名前にします。

    %sql
    CREATE TABLE IF NOT EXISTS cosmosCatalog.`database-v4`.ThroughputControl 
    USING cosmos.oltp
    OPTIONS(spark.cosmos.database = 'database-v4')
    TBLPROPERTIES(partitionKeyPath = '/groupId', autoScaleMaxThroughput = '4000', indexingPolicy = 'AllProperties', defaultTtlInSeconds = '-1');

前の例では、自動スケーリングが有効なコンテナーを作成しています。 標準プロビジョニングを希望する場合は、autoScaleMaxThroughputmanualThroughput に置き換えることができます。

重要

スループット制御機能が動作するには、パーティション キーを /groupId として定義し、ttl を有効にする必要があります。

特定のアプリケーションの Spark 構成内で、ワークロードのパラメーターを指定できます。 次の例では、スループット制御を enabled と設定しています。 この例では、スループット制御グループの name パラメーターと targetThroughputThreshold パラメーターを定義しています。 また、スループット制御グループが保持される databasecontainer パラメーターも定義します。

    "spark.cosmos.throughputControl.enabled" -> "true",
    "spark.cosmos.throughputControl.name" -> "SourceContainerThroughputControl",
    "spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.95", 
    "spark.cosmos.throughputControl.globalControl.database" -> "database-v4", 
    "spark.cosmos.throughputControl.globalControl.container" -> "ThroughputControl"

前の例では、targetThroughputThreshold パラメーターが 0.95 と定義されています。 クライアントの消費量がコンテナーに割り当てられているスループットの 95% (+/- 5 から 10%) を超えると、レート制限が発生します (要求は再試行されます)。 この構成は、次の例のようなスループット コンテナーにドキュメントとして格納されます。

    {
        "id": "ZGF0YWJhc2UtdjQvY3VzdG9tZXIvU291cmNlQ29udGFpbmVyVGhyb3VnaHB1dENvbnRyb2w.info",
        "groupId": "database-v4/customer/SourceContainerThroughputControl.config",
        "targetThroughput": "",
        "targetThroughputThreshold": "0.95",
        "isDefault": true,
        "_rid": "EHcYAPolTiABAAAAAAAAAA==",
        "_self": "dbs/EHcYAA==/colls/EHcYAPolTiA=/docs/EHcYAPolTiABAAAAAAAAAA==/",
        "_etag": "\"2101ea83-0000-1100-0000-627503dd0000\"",
        "_attachments": "attachments/",
        "_ts": 1651835869
    }

スループット制御では、操作ごとに RU の事前計算は行われません。 代わりに、応答ヘッダーに基づいて操作の RU 使用量を追跡します。 そのため、スループット制御は近似値に基づいており、特定の時点でその量のスループットをグループが使用できることは保証されません。

このため、構成された RU が非常に小さく、単一の操作ですべてを使用できる場合、スループット制御では、RU が構成された制限を超えることを回避できません。 スループット制御は、構成されている制限が、特定の制御グループ内の 1 つのクライアントが実行できる単一の操作より大きい場合に、最適に機能します。

クエリまたは変更フィードを使って読み取る場合は、spark.cosmos.read.maxItemCount (既定値は 1000) でページ サイズを適度な量に構成する必要があります。 これにより、クライアントのスループット制御をより高い頻度で再計算し、特定の時点でより正確に反映できます。 一括を使って書き込みジョブのスループット制御を使うときは、スループット制御を可能な限り早く開始できるよう、1 つの要求で実行されるドキュメントの数はスロットリング レートに基づいて自動的に調整されます。

警告

targetThroughputThreshold パラメーターは "変更できません"。 ターゲット スループットしきい値を変更すると、新しいスループット制御グループが作成されます。 (バージョン 4.10.0 以降を使っている場合は、同じ名前にすることができます)。グループを使っているすべての Spark ジョブで新しいしきい値がすぐに使われるようにしたい場合は、それらを再起動する必要があります。 そうしないと、次の再起動後に新しいしきい値が取得されます。

スループット制御グループを使う Spark クライアントごとに、数秒間の ttl で、ThroughputControl コンテナーにレコードが作成されます。 その結果、Spark クライアントがアクティブに実行されなくなると、ドキュメントはすぐに消去されます。 次に例を示します。

    {
        "id": "Zhjdieidjojdook3osk3okso3ksp3ospojsp92939j3299p3oj93pjp93jsps939pkp9ks39kp9339skp",
        "groupId": "database-v4/customer/SourceContainerThroughputControl.config",
        "_etag": "\"1782728-w98999w-ww9998w9-99990000\"",
        "ttl": 10,
        "initializeTime": "2022-06-26T02:24:40.054Z",
        "loadFactor": 0.97636377638898,
        "allocatedThroughput": 484.89444487847,
        "_rid": "EHcYAPolTiABAAAAAAAAAA==",
        "_self": "dbs/EHcYAA==/colls/EHcYAPolTiA=/docs/EHcYAPolTiABAAAAAAAAAA==/",
        "_etag": "\"2101ea83-0000-1100-0000-627503dd0000\"",
        "_attachments": "attachments/",
        "_ts": 1651835869
    }

各クライアント レコードの loadFactor 属性では、特定のクライアントの負荷が、スループット制御グループ内の他のクライアントに対して相対的に表されます。 allocatedThroughput 属性は、現在このクライアントに割り当てられている RU の数を示します。 Spark コネクタは、各クライアントに割り当てられるスループットを、その負荷に基づいて調整します。 これにより、各クライアントは、その負荷に比例して使用可能なスループットを割り当てられます。 すべてのクライアントを合わせた消費量は、それらが属しているスループット制御グループに割り当てられた合計を超えません。