Verbinding maken met Azure Cosmos DB voor Apache Cassandra vanuit Spark
VAN TOEPASSING OP: Cassandra
Dit artikel is een van de artikelen in Azure Cosmos DB voor Apache Cassandra-integratie vanuit Spark. De artikelen hebben betrekking op connectiviteit, DDL-bewerkingen (Data Definition Language), eenvoudige DML-bewerkingen (Data Manipulation Language) en geavanceerde Azure Cosmos DB voor Apache Cassandra-integratie vanuit Spark.
Vereisten
Richt een Azure Cosmos DB in voor een Apache Cassandra-account.
Uw keuze voor Spark-omgeving inrichten [Azure Databricks | Azure HDInsight-Spark | Overige].
Afhankelijkheden voor connectiviteit
Spark-connector voor Cassandra: Spark-connector wordt gebruikt om verbinding te maken met Azure Cosmos DB voor Apache Cassandra. Identificeer en gebruik de versie van de connector in Maven Central die compatibel is met de Spark- en Scala-versies van uw Spark-omgeving. We raden een omgeving aan die Ondersteuning biedt voor Spark 3.2.1 of hoger en de Spark-connector die beschikbaar is op maven-coördinaten
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
. Als u Spark 2.x gebruikt, raden we een omgeving aan met Spark-versie 2.4.5, met behulp van spark-connector op maven-coördinatencom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
.Azure Cosmos DB-helperbibliotheek voor API voor Cassandra: Als u een versie van Spark 2.x gebruikt, hebt u naast de Spark-connector een andere bibliotheek nodig met de naam azure-cosmos-cassandra-spark-helper met maven-coördinaten
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
van Azure Cosmos DB om snelheidsbeperking af te handelen. Deze bibliotheek bevat aangepaste verbindingsfactory- en beleidsklassen voor opnieuw proberen.Het beleid voor opnieuw proberen in Azure Cosmos DB is geconfigureerd voor het afhandelen van HTTP-statuscode 429("Request Rate Large")-uitzonderingen. De Azure Cosmos DB voor Apache Cassandra vertaalt deze uitzonderingen in overbelaste fouten in het systeemeigen Cassandra-protocol en u kunt het opnieuw proberen met back-offs. Omdat Azure Cosmos DB gebruikmaakt van een ingericht doorvoermodel, treden aanvraagsnelheidsbeperkingsonderingen op wanneer de snelheid voor inkomend/uitgaand verkeer toeneemt. Het beleid voor opnieuw proberen beschermt uw Spark-taken tegen gegevenspieken die tijdelijk de doorvoer overschrijden die is toegewezen voor uw container. Als u de Spark 3.x-connector gebruikt, is het implementeren van deze bibliotheek niet vereist.
Notitie
Het beleid voor opnieuw proberen kan uw spark-taken alleen beschermen tegen tijdelijke pieken. Als u onvoldoende RU's hebt geconfigureerd die nodig zijn om uw workload uit te voeren, is het beleid voor opnieuw proberen niet van toepassing en wordt de uitzondering opnieuw uitgevoerd door de beleidsklasse voor opnieuw proberen.
Verbindingsgegevens voor Azure Cosmos DB-account: uw Azure-API voor Cassandra-accountnaam, accounteindpunt en -sleutel.
Doorvoerconfiguratie van Spark-connector optimaliseren
In de volgende sectie staan alle relevante parameters voor het beheren van de doorvoer met behulp van de Spark Connector voor Cassandra. Om parameters te optimaliseren om de doorvoer voor Spark-taken te maximaliseren, moeten de spark.cassandra.output.concurrent.writes
, spark.cassandra.concurrent.reads
en spark.cassandra.input.reads_per_sec
configuraties correct worden geconfigureerd om te voorkomen dat er te veel beperking en back-off (wat op zijn beurt kan leiden tot lagere doorvoer).
De optimale waarde van deze configuraties is afhankelijk van vier factoren:
- De hoeveelheid doorvoer (aanvraageenheden) die is geconfigureerd voor de tabel waarin gegevens worden opgenomen.
- Het aantal werkrollen in uw Spark-cluster.
- Het aantal uitvoerders dat is geconfigureerd voor uw Spark-taak (die kan worden beheerd met
spark.cassandra.connection.connections_per_executor_max
ofspark.cassandra.connection.remoteConnectionsPerExecutor
afhankelijk van de Spark-versie) - De gemiddelde latentie van elke aanvraag naar Azure Cosmos DB als u zich in hetzelfde datacenter bevindt. Stel dat deze waarde 10 ms is voor schrijfbewerkingen en 3 ms voor leesbewerkingen.
Als we bijvoorbeeld vijf werkrollen hebben en een waarde van spark.cassandra.output.concurrent.writes
= 1 en een waarde van spark.cassandra.connection.remoteConnectionsPerExecutor
= 1, hebben we vijf werkrollen die gelijktijdig in de tabel worden geschreven, elk met één thread. Als het 10 ms duurt om één schrijfbewerking uit te voeren, kunnen we per thread 100 aanvragen (1000 milliseconden gedeeld door 10) per seconde verzenden. Met vijf werknemers zou dit 500 schrijfbewerkingen per seconde zijn. Tegen een gemiddelde kosten van vijf aanvraageenheden (RU's) per schrijfbewerking moet de doeltabel minimaal 2500 aanvraageenheden inrichten (5 RU's x 500 schrijfbewerkingen per seconde).
Door het aantal uitvoerders te verhogen, kan het aantal threads in een bepaalde taak toenemen, waardoor de doorvoer op zijn beurt kan worden verhoogd. De exacte impact hiervan kan echter variabel zijn, afhankelijk van de taak, terwijl het beheren van de doorvoer met het aantal werkrollen meer deterministisch is. U kunt ook de exacte kosten van een bepaalde aanvraag bepalen door deze te profileren om de ru-kosten (Request Unit) op te halen. Dit helpt u om nauwkeuriger te zijn bij het inrichten van doorvoer voor uw tabel of keyspace. Bekijk ons artikel hier voor meer informatie over het ophalen van kosten per aanvraageenheid per aanvraagniveau.
Doorvoer schalen in de database
De Cassandra Spark-connector zorgt voor een efficiënte doorvoer in Azure Cosmos DB. Als gevolg hiervan moet u, zelfs bij effectieve nieuwe pogingen, ervoor zorgen dat u voldoende doorvoer (RU's) op tabel- of keyspaceniveau hebt ingericht om snelheidsbeperkingsfouten te voorkomen. De minimuminstelling van 400 RU's in een bepaalde tabel of keyspace is niet voldoende. Zelfs bij minimale configuratie-instellingen voor doorvoer kan de Spark-connector schrijven met een snelheid die overeenkomt met ongeveer 6000 aanvraageenheden of meer.
Als de RU-instelling die is vereist voor gegevensverplaatsing met Behulp van Spark hoger is dan wat vereist is voor uw werkbelasting met een stabiele status, kunt u de doorvoer eenvoudig systematisch omhoog en omlaag schalen in Azure Cosmos DB om te voldoen aan de behoeften van uw workload gedurende een bepaalde periode. Lees ons artikel over elastisch schalen in API voor Cassandra voor inzicht in de verschillende opties voor programmatisch en dynamisch schalen.
Notitie
In de bovenstaande richtlijnen wordt uitgegaan van een redelijk uniforme verdeling van gegevens. Als u een aanzienlijke scheefheid in de gegevens hebt (dat wil gezegd, een buitengewoon groot aantal lees-/schrijfbewerkingen naar dezelfde partitiesleutelwaarde), kunnen er nog steeds knelpunten optreden, zelfs als u een groot aantal aanvraageenheden hebt ingericht in uw tabel. Aanvraageenheden worden gelijkmatig verdeeld over fysieke partities en zware scheeftrekken van gegevens kunnen leiden tot een knelpunt van aanvragen naar één partitie.
Configuratieparameters voor doorvoer van Spark-connector
De volgende tabel bevat azure Cosmos DB voor configuratieparameters voor doorvoer die specifiek zijn voor Apache Cassandra die door de connector worden geleverd. Zie de configuratiereferentiepagina van de GitHub-opslagplaats van de Spark Cassandra-connector voor een gedetailleerde lijst met alle configuratieparameters.
Naam van eigenschap | Standaardwaarde | Beschrijving |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | Aantal rijen per batch. Stel deze parameter in op 1. Deze parameter wordt gebruikt om een hogere doorvoer te bereiken voor zware werkbelastingen. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | Geen | Maximum aantal verbindingen per knooppunt per uitvoerder. 10*n is gelijk aan 10 verbindingen per knooppunt in een Cassandra-cluster met n knooppunten. Dus als u vijf verbindingen per knooppunt per uitvoerder nodig hebt voor een Cassandra-cluster met vijf knooppunten, moet u deze configuratie instellen op 25. Wijzig deze waarde op basis van de mate van parallelle uitvoering of het aantal uitvoerders waarvoor uw Spark-taken zijn geconfigureerd. |
spark.cassandra.output.concurrent.writes | 100 | Hiermee definieert u het aantal parallelle schrijfbewerkingen dat per uitvoerder kan optreden. Omdat u 'batch.size.rows' instelt op 1, moet u deze waarde dienovereenkomstig omhoog schalen. Wijzig deze waarde op basis van de mate van parallelle uitvoering of de doorvoer die u wilt bereiken voor uw workload. |
spark.cassandra.concurrent.reads | 512 | Hiermee definieert u het aantal parallelle leesbewerkingen dat per uitvoerder kan optreden. Wijzig deze waarde op basis van de mate van parallelle uitvoering of de doorvoer die u wilt bereiken voor uw workload |
spark.cassandra.output.throughput_mb_per_sec | Geen | Hiermee definieert u de totale schrijfdoorvoer per uitvoerder. Deze parameter kan worden gebruikt als een bovengrens voor de doorvoer van uw Spark-taak en baseer deze op de ingerichte doorvoer van uw Azure Cosmos DB-container. |
spark.cassandra.input.reads_per_sec | Geen | Hiermee definieert u de totale leesdoorvoer per uitvoerder. Deze parameter kan worden gebruikt als een bovengrens voor de doorvoer van uw Spark-taak en baseer deze op de ingerichte doorvoer van uw Azure Cosmos DB-container. |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | Definieert het aantal batches per enkele Spark-taak die in het geheugen kan worden opgeslagen voordat deze naar de API voor Cassandra wordt verzonden |
spark.cassandra.connection.keep_alive_ms | 60000 | Hiermee definieert u de periode totdat ongebruikte verbindingen beschikbaar zijn. |
Pas de doorvoer en mate van parallelle uitvoering van deze parameters aan op basis van de workload die u verwacht voor uw Spark-taken en de doorvoer die u hebt ingericht voor uw Azure Cosmos DB-account.
Verbinding maken met Azure Cosmos DB voor Apache Cassandra vanuit Spark
cqlsh
In de volgende opdrachten wordt beschreven hoe u vanuit cqlsh verbinding maakt met Azure Cosmos DB voor Apache Cassandra. Dit is handig voor validatie wanneer u de voorbeelden in Spark doorloopt.
Van Linux/Unix/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
In het onderstaande artikel wordt het inrichten van Azure Databricks-clusters beschreven, de clusterconfiguratie voor het maken van verbinding met Azure Cosmos DB voor Apache Cassandra en verschillende voorbeeldnotebooks die betrekking hebben op DDL-bewerkingen, DML-bewerkingen en meer.
Werken met Azure Cosmos DB voor Apache Cassandra vanuit Azure Databricks
2. Azure HDInsight-Spark
Het onderstaande artikel bevat informatie over de HDinsight-Spark-service, inrichting, clusterconfiguratie voor het maken van verbinding met Azure Cosmos DB voor Apache Cassandra en verschillende voorbeeldnotebooks die betrekking hebben op DDL-bewerkingen, DML-bewerkingen en meer.
Werken met Azure Cosmos DB voor Apache Cassandra vanuit Azure HDInsight-Spark
3. Spark-omgeving in het algemeen
Hoewel de bovenstaande secties specifiek waren voor PaaS-services op basis van Azure Spark, worden in deze sectie alle algemene Spark-omgevingen behandeld. De configuratie van connectorafhankelijkheden, import- en Spark-sessies wordt hieronder beschreven. De sectie 'Volgende stappen' bevat codevoorbeelden voor DDL-bewerkingen, DML-bewerkingen en meer.
Connectorafhankelijkheden:
- De maven-coördinaten toevoegen om de Cassandra-connector voor Spark op te halen
- De maven-coördinaten voor de Azure Cosmos DB-helperbibliotheek voor API voor Cassandra toevoegen
Invoer:
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Configuratie van Spark-sessie:
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Volgende stappen
In de volgende artikelen ziet u de Integratie van Spark met Azure Cosmos DB voor Apache Cassandra.