Ansluta till Azure Cosmos DB för Apache Cassandra från Spark
GÄLLER FÖR: Kassandra
Den här artikeln är en av en serie artiklar om Azure Cosmos DB för Apache Cassandra-integrering från Spark. Artiklarna beskriver anslutningar, DDL-åtgärder (Data Definition Language), grundläggande DML-åtgärder (Data Manipulation Language) och avancerad Azure Cosmos DB för Apache Cassandra-integrering från Spark.
Förutsättningar
Etablera ditt val av Spark-miljö [Azure Databricks | Azure HDInsight-Spark | Andra].
Beroenden för anslutning
Spark-anslutningsapp för Cassandra: Spark-anslutningsappen används för att ansluta till Azure Cosmos DB för Apache Cassandra. Identifiera och använd den version av anslutningsappen som finns i Maven Central som är kompatibel med Spark- och Scala-versionerna av din Spark-miljö. Vi rekommenderar en miljö som stöder Spark 3.2.1 eller senare och spark-anslutningsappen som är tillgänglig vid maven-koordinaterna
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
. Om du använder Spark 2.x rekommenderar vi en miljö med Spark version 2.4.5 med spark-anslutningsappen vid maven-koordinaternacom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
.Azure Cosmos DB-hjälpbibliotek för API för Cassandra: Om du använder en version av Spark 2.x behöver du, förutom Spark-anslutningsappen, ett annat bibliotek med namnet azure-cosmos-cassandra-spark-helper med maven-koordinater
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
från Azure Cosmos DB för att hantera hastighetsbegränsning. Det här biblioteket innehåller anpassade anslutningsfabriks- och återförsöksprincipklasser.Återförsöksprincipen i Azure Cosmos DB är konfigurerad för att hantera HTTP-statuskod 429("Begärandefrekvens stor") undantag. Azure Cosmos DB för Apache Cassandra översätter dessa undantag till överlagrade fel i det interna Cassandra-protokollet och du kan försöka igen med back-offs. Eftersom Azure Cosmos DB använder en etablerad dataflödesmodell uppstår undantag för frekvensbegränsning för begäranden när ingress-/utgående priser ökar. Återförsöksprincipen skyddar dina spark-jobb mot datatoppar som tillfälligt överskrider det allokerade dataflödet för containern. Om du använder Spark 3.x-anslutningsappen krävs inte implementering av det här biblioteket.
Kommentar
Återförsöksprincipen kan endast skydda dina spark-jobb mot momentära toppar. Om du inte har konfigurerat tillräckligt många RU:er som krävs för att köra din arbetsbelastning är återförsöksprincipen inte tillämplig och återförsöksprincipklassen återanvänder undantaget.
Anslutningsinformation för Azure Cosmos DB-konto: Ditt Azure API för Cassandra-kontonamn, kontoslutpunkt och nyckel.
Optimera konfiguration av Spark-anslutningstjänstens dataflöde
I nästa avsnitt visas alla relevanta parametrar för att kontrollera dataflödet med spark-anslutningsappen för Cassandra. För att optimera parametrarna för att maximera dataflödet för spark-jobb spark.cassandra.output.concurrent.writes
måste konfigurationerna , spark.cassandra.concurrent.reads
, och spark.cassandra.input.reads_per_sec
vara korrekt konfigurerade för att undvika för mycket begränsning och back-off (vilket i sin tur kan leda till lägre dataflöde).
Det optimala värdet för dessa konfigurationer beror på fyra faktorer:
- Mängden dataflöde (enheter för begäran) som konfigurerats för den tabell som data matas in i.
- Antalet arbetare i Spark-klustret.
- Antalet köre som konfigurerats för spark-jobbet (som kan styras med eller
spark.cassandra.connection.connections_per_executor_max
spark.cassandra.connection.remoteConnectionsPerExecutor
beroende på Spark-version) - Den genomsnittliga svarstiden för varje begäran till Azure Cosmos DB, om du är samlad i samma datacenter. Anta att det här värdet är 10 ms för skrivningar och 3 ms för läsningar.
Om vi till exempel har fem arbetare och värdet spark.cassandra.output.concurrent.writes
= 1 och värdet spark.cassandra.connection.remoteConnectionsPerExecutor
= 1, så har vi fem arbetare som samtidigt skriver till tabellen, var och en med en tråd. Om det tar 10 ms att utföra en enda skrivning kan vi skicka 100 begäranden (1 000 millisekunder dividerat med 10) per sekund, per tråd. Med fem arbetare skulle detta vara 500 skrivningar per sekund. Till en genomsnittlig kostnad av fem enheter för begäranden (RU:er) per skrivning skulle måltabellen behöva minst 2 500 enheter för begäranden etablerade (5 RU:er x 500 skrivningar per sekund).
Om du ökar antalet utförare kan du öka antalet trådar i ett visst jobb, vilket i sin tur kan öka dataflödet. Den exakta effekten av detta kan dock vara variabel beroende på jobbet, medan det är mer deterministiskt att kontrollera dataflödet med antalet arbetare. Du kan också fastställa den exakta kostnaden för en viss begäran genom att profilera den för att hämta ru-avgiften (Request Unit). Detta hjälper dig att vara mer exakt när du etablerar dataflöde för tabellen eller nyckelområdet. Ta en titt på vår artikel här för att förstå hur du får avgifter för begärandeenheten på en nivå per begäran.
Skala dataflöde i databasen
Cassandra Spark-anslutningsappen mättar dataflödet effektivt i Azure Cosmos DB. Därför måste du, även med effektiva återförsök, se till att du har tillräckligt med dataflöde (RU:er) på tabell- eller nyckelområdesnivå för att förhindra hastighetsbegränsning av relaterade fel. Den minsta inställningen på 400 RU:er i en viss tabell eller nyckelrymd räcker inte. Även vid minsta konfigurationsinställningar för dataflöde kan Spark-anslutningsappen skriva med en hastighet som motsvarar cirka 6 000 enheter för begäran eller mer.
Om RU-inställningen som krävs för dataflytt med Spark är högre än vad som krävs för din arbetsbelastning med stabilt tillstånd kan du enkelt skala upp och ned dataflödet systematiskt i Azure Cosmos DB för att uppfylla arbetsbelastningens behov under en viss tidsperiod. Läs vår artikel om elastisk skalning i API för Cassandra för att förstå de olika alternativen för skalning programmatiskt och dynamiskt.
Kommentar
Riktlinjerna ovan förutsätter en någorlunda enhetlig fördelning av data. Om du har en betydande snedställning i data (dvs. ett omåttligt stort antal läsningar/skrivningar till samma partitionsnyckelvärde) kan det fortfarande uppstå flaskhalsar, även om du har ett stort antal enheter för begäranden etablerade i tabellen. Enheter för begärande delas lika mellan fysiska partitioner, och stora datasnedvridningar kan orsaka en flaskhals med begäranden till en enda partition.
Konfigurationsparametrar för Spark-anslutningstjänstens dataflöde
I följande tabell visas konfigurationsparametrar för Azure Cosmos DB för Apache Cassandra-specifika dataflöde som tillhandahålls av anslutningsappen. En detaljerad lista över alla konfigurationsparametrar finns på konfigurationsreferenssidan för GitHub-lagringsplatsen för Spark Cassandra Connector.
Egenskapsnamn | Standardvärde | Beskrivning |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | Antal rader per enskild batch. Ange den här parametern till 1. Den här parametern används för att uppnå högre dataflöde för tunga arbetsbelastningar. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | Ingen | Maximalt antal anslutningar per nod per köre. 10*n motsvarar 10 anslutningar per nod i ett Cassandra-kluster med n-nod. Så om du behöver fem anslutningar per nod per köre för ett Cassandra-kluster med fem noder bör du ange den här konfigurationen till 25. Ändra det här värdet baserat på graden av parallellitet eller antalet utförare som spark-jobben har konfigurerats för. |
spark.cassandra.output.concurrent.writes | 100 | Definierar antalet parallella skrivningar som kan ske per köre. Eftersom du anger "batch.size.rows" till 1 skalar du upp det här värdet därefter. Ändra det här värdet baserat på graden av parallellitet eller det dataflöde som du vill uppnå för din arbetsbelastning. |
spark.cassandra.concurrent.reads | 512 | Definierar antalet parallella läsningar som kan ske per köre. Ändra det här värdet baserat på graden av parallellitet eller det dataflöde som du vill uppnå för din arbetsbelastning |
spark.cassandra.output.throughput_mb_per_sec | Ingen | Definierar det totala skrivdataflödet per köre. Den här parametern kan användas som en övre gräns för spark-jobbets dataflöde och basera den på det etablerade dataflödet för din Azure Cosmos DB-container. |
spark.cassandra.input.reads_per_sec | Ingen | Definierar det totala läsdataflödet per köre. Den här parametern kan användas som en övre gräns för spark-jobbets dataflöde och basera den på det etablerade dataflödet för din Azure Cosmos DB-container. |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | Definierar antalet batchar per enskild spark-uppgift som kan lagras i minnet innan de skickas till API:et för Cassandra |
spark.cassandra.connection.keep_alive_ms | 60000 | Definierar den tidsperiod till vilken oanvända anslutningar är tillgängliga. |
Justera dataflödet och graden av parallellitet för dessa parametrar baserat på den arbetsbelastning du förväntar dig för dina Spark-jobb och det dataflöde som du har etablerat för ditt Azure Cosmos DB-konto.
Ansluta till Azure Cosmos DB för Apache Cassandra från Spark
cqlsh
Följande kommandon beskriver hur du ansluter till Azure Cosmos DB för Apache Cassandra från cqlsh. Det här är användbart för validering när du kör exemplen i Spark.
Från Linux/Unix/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
Artikeln nedan beskriver Azure Databricks-klusteretablering, klusterkonfiguration för anslutning till Azure Cosmos DB för Apache Cassandra och flera exempel på notebook-filer som omfattar DDL-åtgärder, DML-åtgärder med mera.
Arbeta med Azure Cosmos DB för Apache Cassandra från Azure Databricks
2. Azure HDInsight-Spark
Artikeln nedan beskriver HDinsight-Spark-tjänsten, etablering, klusterkonfiguration för anslutning till Azure Cosmos DB för Apache Cassandra och flera exempel på notebook-filer som omfattar DDL-åtgärder, DML-åtgärder med mera.
Arbeta med Azure Cosmos DB för Apache Cassandra från Azure HDInsight-Spark
3. Spark-miljö i allmänhet
Avsnitten ovan var specifika för Azure Spark-baserade PaaS-tjänster, men det här avsnittet beskriver alla allmänna Spark-miljöer. Anslutningsberoenden, importer och Spark-sessionskonfiguration beskrivs nedan. Avsnittet "Nästa steg" beskriver kodexempel för DDL-åtgärder, DML-åtgärder med mera.
Anslutningsberoenden:
- Lägg till maven-koordinaterna för att hämta Cassandra-anslutningsappen för Spark
- Lägg till maven-koordinaterna för Azure Cosmos DB-hjälpbiblioteket för API för Cassandra
Import:
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Spark-sessionskonfiguration:
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Nästa steg
Följande artiklar visar Spark-integrering med Azure Cosmos DB för Apache Cassandra.