Připojení ke službě Azure Cosmos DB pro Apache Cassandra ze Sparku
PLATÍ PRO: Cassandra
Tento článek je jedním z řady článků o integraci Azure Cosmos DB for Apache Cassandra ze Sparku. Články se týkají operací připojení, jazyka DDL (Data Definition Language), základních operací jazyka DML (Data Manipulat Language) a pokročilé integrace azure Cosmos DB for Apache Cassandra ze Sparku.
Požadavky
Zřízení zvoleného prostředí Sparku [Azure Databricks | Azure HDInsight-Spark | Ostatní].
Závislosti pro připojení
Konektor Spark pro Cassandra: Konektor Spark se používá k připojení ke službě Azure Cosmos DB pro Apache Cassandra. Identifikujte a použijte verzi konektoru umístěného v centru Mavenu, která je kompatibilní s verzemi Sparku a Scala vašeho prostředí Spark. Doporučujeme prostředí, které podporuje Spark 3.2.1 nebo vyšší a konektor Spark dostupný na souřadnicích
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
mavenu . Pokud používáte Spark 2.x, doporučujeme prostředí se Sparkem verze 2.4.5 pomocí konektoru Spark na souřadnicíchcom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
Mavenu.Pomocná knihovna služby Azure Cosmos DB pro rozhraní API pro Cassandra: Pokud používáte verzi Spark 2.x, pak kromě konektoru Spark potřebujete další knihovnu s názvem azure-cosmos-cassandra-spark-helper s souřadnicemi
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
Mavenu ze služby Azure Cosmos DB, aby bylo možné zvládnout omezování rychlosti. Tato knihovna obsahuje vlastní objekt pro vytváření připojení a třídy zásad opakování.Zásady opakování ve službě Azure Cosmos DB jsou nakonfigurované tak, aby zpracovávaly výjimky stavového kódu HTTP 429("Velikost četnosti požadavků"). Azure Cosmos DB pro Apache Cassandra tyto výjimky překládá do přetížených chyb v nativním protokolu Cassandra a můžete to zkusit znovu s back-offs. Vzhledem k tomu, že Azure Cosmos DB používá model zřízené propustnosti, dochází k výjimkám omezování rychlosti požadavků při zvýšení rychlosti příchozího a výchozího přenosu dat. Zásady opakování chrání vaše úlohy Spark před špičkami dat, které momentálně překračují propustnost přidělenou kontejneru. Pokud používáte konektor Spark 3.x, implementace této knihovny se nevyžaduje.
Poznámka:
Zásady opakování můžou chránit úlohy Sparku pouze proti momentárním špičkám. Pokud jste nenakonfigurovali dostatek RU potřebných ke spuštění úlohy, zásady opakování se nepoužijí a třída zásad opakování výjimku znovu zvětšuje.
Podrobnosti o připojení účtu služby Azure Cosmos DB: Název účtu Azure API pro Cassandra, koncový bod účtu a klíč.
Optimalizace konfigurace propustnosti konektoru Spark
Uvedené v další části jsou všechny relevantní parametry pro řízení propustnosti pomocí konektoru Spark pro Cassandra. Aby bylo možné optimalizovat parametry pro maximalizaci propustnosti pro úlohy Sparku, spark.cassandra.output.concurrent.writes
spark.cassandra.concurrent.reads
je potřeba správně nakonfigurovat konfiguraci a spark.cassandra.input.reads_per_sec
konfigurace, aby se zabránilo příliš velkému omezování a back-off (což zase může vést k nižší propustnosti).
Optimální hodnota těchto konfigurací závisí na čtyřech faktorech:
- Množství propustnosti (jednotky žádostí) nakonfigurované pro tabulku, do které se data ingestují.
- Počet pracovních procesů v clusteru Spark.
- Počet exekutorů nakonfigurovaných pro vaši úlohu Sparku (kterou je možné řídit pomocí
spark.cassandra.connection.connections_per_executor_max
verze Sparku nebospark.cassandra.connection.remoteConnectionsPerExecutor
v závislosti na verzi Sparku) - Průměrná latence každého požadavku do služby Azure Cosmos DB, pokud jste kompletovali do stejného datového centra. Předpokládejme, že tato hodnota je 10 ms pro zápisy a 3 ms pro čtení.
Pokud máme například pět pracovních procesů a hodnotu spark.cassandra.output.concurrent.writes
= 1 a hodnotu spark.cassandra.connection.remoteConnectionsPerExecutor
= 1, pak máme pět pracovních procesů, které jsou souběžně zapisovat do tabulky, z nichž každý má jedno vlákno. Pokud k provedení jednoho zápisu trvá 10 ms, můžeme odeslat 100 požadavků (1000 milisekund děleno 10) za sekundu na vlákno. S pěti pracovníky by to bylo 500 zápisů za sekundu. Při průměrných nákladech na pět jednotek žádostí (RU) na zápis by cílová tabulka potřebovala minimálně 2500 jednotek žádostí zřízených (5 RU × 500 zápisů za sekundu).
Zvýšení počtu exekutorů může zvýšit počet vláken v dané úloze, což může zase zvýšit propustnost. Přesný dopad toho ale může být proměnlivý v závislosti na úloze a řízení propustnosti s počtem pracovních procesů je determinističtější. Můžete také určit přesné náklady na danou žádost tím, že ji profilujete, abyste získali poplatek za jednotku žádosti (RU). To vám pomůže být přesnější při zřizování propustnosti pro tabulku nebo prostor klíčů. Podívejte se na náš článek , abyste pochopili, jak získat poplatky za jednotky žádostí na úrovni žádosti.
Škálování propustnosti v databázi
Konektor Cassandra Spark efektivně saturuje propustnost ve službě Azure Cosmos DB. V důsledku toho i při efektivních opakováních budete muset zajistit, abyste měli dostatečnou propustnost (RU) zřízenou na úrovni tabulky nebo prostoru klíčů, abyste zabránili chybám souvisejícím s rychlostí. Minimální nastavení 400 RU v dané tabulce nebo prostoru klíčů nebude stačit. I s minimálním nastavením konfigurace propustnosti může konektor Spark zapisovat rychlostí odpovídající přibližně 6000 jednotkám žádostí nebo více.
Pokud je nastavení RU vyžadované pro přesun dat pomocí Sparku vyšší, než je potřeba pro úlohu stabilního stavu, můžete v Azure Cosmos DB systematicky škálovat propustnost nahoru a dolů, aby vyhovovala potřebám vaší úlohy v daném časovém období. Přečtěte si náš článek o elastickém škálování v rozhraní API pro Cassandru a seznamte se s různými možnostmi pro programové a dynamické škálování.
Poznámka:
Výše uvedené pokyny předpokládají rozumně jednotnou distribuci dat. Pokud máte v datech významnou nerovnoměrnou distribuci (tj. nesměrovaný počet čtení a zápisů do stejné hodnoty klíče oddílu), může docházet k kritickým bodům i v případě, že máte v tabulce zřízený velký počet jednotek žádostí. Jednotky žádostí jsou rovnoměrně rozdělené mezi fyzické oddíly a nerovnoměrná distribuce dat může způsobit kritické body požadavků na jeden oddíl.
Parametry konfigurace propustnosti konektoru Sparku
Následující tabulka uvádí parametry konfigurace propustnosti specifické pro Azure Cosmos DB pro Apache Cassandra, které poskytuje konektor. Podrobný seznam všech parametrů konfigurace najdete na stránce s referenčními informacemi o konfiguraci úložiště GitHub konektoru Spark Cassandra.
Název vlastnosti | Výchozí hodnota | Popis |
---|---|---|
spark.cassandra.output.batch.size.rows | 0 | Počet řádků na jednu dávku Nastavte tento parametr na hodnotu 1. Tento parametr slouží k dosažení vyšší propustnosti pro náročné úlohy. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | Nic | Maximální počet připojení na uzel na exekutor 10*n je ekvivalentní 10 připojení na uzel v clusteru Cassandra n-node. Pokud tedy pro cluster Cassandra s pěti uzly vyžadujete pět připojení na jeden uzel, měli byste tuto konfiguraci nastavit na 25. Upravte tuto hodnotu na základě stupně paralelismu nebo počtu exekutorů, pro které jsou nakonfigurované úlohy Sparku. |
spark.cassandra.output.concurrent.writes | 100 | Definuje počet paralelních zápisů, ke kterým může dojít na exekutor. Vzhledem k tomu, že nastavíte hodnotu batch.size.rows na hodnotu 1, nezapomeňte odpovídajícím způsobem vertikálně navýšit kapacitu této hodnoty. Upravte tuto hodnotu na základě stupně paralelismu nebo propustnosti, kterou chcete pro úlohu dosáhnout. |
spark.cassandra.concurrent.reads | 512 | Definuje počet paralelních čtení, ke kterým může dojít na exekutor. Upravte tuto hodnotu na základě stupně paralelismu nebo propustnosti, kterou chcete pro úlohu dosáhnout. |
spark.cassandra.output.throughput_mb_per_sec | Nic | Definuje celkovou propustnost zápisu na exekutor. Tento parametr se dá použít jako horní limit propustnosti úlohy Sparku a založit ho na zřízené propustnosti kontejneru Azure Cosmos DB. |
spark.cassandra.input.reads_per_sec | Nic | Definuje celkovou propustnost čtení na exekutor. Tento parametr se dá použít jako horní limit propustnosti úlohy Sparku a založit ho na zřízené propustnosti kontejneru Azure Cosmos DB. |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | Definuje počet dávek na jednu úlohu Sparku, která se dá před odesláním do rozhraní API pro Cassandra uložit v paměti. |
spark.cassandra.connection.keep_alive_ms | 60000 | Definuje časové období, do kterého jsou k dispozici nepoužívané připojení. |
Na základě očekávané úlohy Sparku upravte propustnost a stupeň paralelismu těchto parametrů a propustnost, kterou jste pro svůj účet služby Azure Cosmos DB zřídili.
Připojení ke službě Azure Cosmos DB pro Apache Cassandra ze Sparku
cqlsh
Následující příkazy podrobně uvádějí, jak se připojit ke službě Azure Cosmos DB for Apache Cassandra z cqlsh. To je užitečné pro ověření při procházení ukázek ve Sparku.
Ze systémů Linux/Unix/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
Následující článek se zabývá zřizováním clusterů Azure Databricks, konfigurací clusteru pro připojení ke službě Azure Cosmos DB for Apache Cassandra a několika ukázkovými poznámkovými bloky, které pokrývají operace DDL, operace DML a další.
Práce se službou Azure Cosmos DB pro Apache Cassandra z Azure Databricks
2. Azure HDInsight-Spark
Následující článek se zabývá službou HDinsight-Spark, zřizováním, konfigurací clusteru pro připojení ke službě Azure Cosmos DB for Apache Cassandra a několika ukázkovými poznámkovými bloky, které pokrývají operace DDL, operace DML a další.
Práce se službou Azure Cosmos DB pro Apache Cassandra z Azure HDInsight-Spark
3. Prostředí Sparku obecně
I když výše uvedené části byly specifické pro služby PaaS založené na Azure Sparku, tato část se věnuje všem obecným prostředím Spark. Níže jsou podrobně popsány závislosti konektorů, importy a konfigurace relace Sparku. Část Další kroky popisuje ukázky kódu pro operace DDL, operace DML a další.
Závislosti konektorů:
- Přidání souřadnic Mavenu pro získání konektoru Cassandra pro Spark
- Přidání souřadnic Mavenu pro pomocnou knihovnu azure Cosmos DB pro rozhraní API for Cassandra
Importuje:
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Konfigurace relace Sparku:
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Další kroky
Následující články ukazují integraci Sparku se službou Azure Cosmos DB pro Apache Cassandra.