Nawiązywanie połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark
DOTYCZY: Kasandra
Ten artykuł jest jednym z serii artykułów na temat integracji usługi Azure Cosmos DB for Apache Cassandra z platformy Spark. Artykuły obejmują operacje łączności, operacje języka definicji danych (DDL), podstawowe operacje języka manipulowania danymi (DML) oraz zaawansowaną integrację usługi Azure Cosmos DB z platformą Apache Cassandra z platformy Spark.
Wymagania wstępne
Aprowizuj konto usługi Azure Cosmos DB dla bazy danych Apache Cassandra.
Aprowizuj wybrane środowisko Spark [Azure Databricks | Azure HDInsight-Spark | Inne].
Zależności dotyczące łączności
Łącznik Platformy Spark dla rozwiązania Cassandra: łącznik spark służy do nawiązywania połączenia z usługą Azure Cosmos DB dla usługi Apache Cassandra. Zidentyfikuj i użyj wersji łącznika znajdującego się w centrum programu Maven, która jest zgodna z wersjami spark i Scala środowiska Spark. Zalecamy środowisko obsługujące platformę Spark w wersji 3.2.1 lub nowszej, a łącznik spark dostępny na współrzędnych
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
maven . Jeśli używasz platformy Spark 2.x, zalecamy środowisko z platformą Spark w wersji 2.4.5 przy użyciu łącznika spark na współrzędnychcom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
narzędzia Maven.Biblioteka pomocnika usługi Azure Cosmos DB dla interfejsu API dla rozwiązania Cassandra: jeśli używasz wersji Spark 2.x, oprócz łącznika Spark potrzebujesz innej biblioteki o nazwie azure-cosmos-cassandra-spark-helper z współrzędnymi
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
maven z usługi Azure Cosmos DB w celu obsługi ograniczania szybkości. Ta biblioteka zawiera niestandardową fabrykę połączeń i klasy zasad ponawiania prób.Zasady ponawiania prób w usłudze Azure Cosmos DB są skonfigurowane do obsługi wyjątków kodu stanu HTTP 429 ("Żądaj dużej liczby żądań"). Usługa Azure Cosmos DB dla systemu Apache Cassandra tłumaczy te wyjątki na przeciążone błędy w natywnym protokole Cassandra i można ponowić próbę z wycofywaniem. Ponieważ usługa Azure Cosmos DB używa modelu aprowizowanej przepływności, wyjątki ograniczania szybkości żądań występują po wzroście szybkości ruchu przychodzącego/wychodzącego. Zasady ponawiania prób chronią zadania platformy Spark przed skokami danych, które chwilowo przekraczają przepływność przydzieloną dla kontenera. W przypadku korzystania z łącznika Spark 3.x implementacja tej biblioteki nie jest wymagana.
Uwaga
Zasady ponawiania prób mogą chronić zadania platformy Spark tylko przed nagłymi skokami. Jeśli nie skonfigurowano wystarczającej liczby jednostek RU wymaganych do uruchomienia obciążenia, zasady ponawiania nie mają zastosowania, a klasa zasad ponawiania spowoduje ponowne wywołanie wyjątku.
Szczegóły połączenia konta usługi Azure Cosmos DB: Twój interfejs API platformy Azure dla nazwy konta Cassandra, punktu końcowego konta i klucza.
Optymalizowanie konfiguracji przepływności łącznika platformy Spark
Wymienione w następnej sekcji są wszystkimi odpowiednimi parametrami do kontrolowania przepływności przy użyciu łącznika Spark dla bazy danych Cassandra. Aby zoptymalizować parametry w celu zmaksymalizowania przepływności zadań platformy Spark, spark.cassandra.output.concurrent.writes
konfiguracje , spark.cassandra.concurrent.reads
i spark.cassandra.input.reads_per_sec
muszą być prawidłowo skonfigurowane, aby uniknąć zbyt dużej przepustowości i wycofywania (co z kolei może prowadzić do obniżenia przepływności).
Optymalna wartość tych konfiguracji zależy od czterech czynników:
- Ilość przepływności (jednostek żądań) skonfigurowana dla tabeli, w ramach których są pozyskiwane dane.
- Liczba procesów roboczych w klastrze Spark.
- Liczba funkcji wykonawczych skonfigurowanych dla zadania platformy Spark (które można kontrolować przy użyciu
spark.cassandra.connection.connections_per_executor_max
lubspark.cassandra.connection.remoteConnectionsPerExecutor
w zależności od wersji platformy Spark) - Średnie opóźnienie każdego żądania w usłudze Azure Cosmos DB, jeśli jest sortowane w tym samym centrum danych. Załóżmy, że ta wartość to 10 ms dla operacji zapisu i 3 ms dla operacji odczytu.
Jeśli na przykład mamy pięć procesów roboczych i wartość spark.cassandra.output.concurrent.writes
= 1, a wartość spark.cassandra.connection.remoteConnectionsPerExecutor
= 1, mamy pięć procesów roboczych, które jednocześnie zapisują się w tabeli, z których każdy ma jeden wątek. Jeśli wykonanie pojedynczego zapisu zajmuje 10 ms, możemy wysłać 100 żądań (1000 milisekund podzielonych przez 10) na sekundę, na wątek. W przypadku pięciu procesów roboczych byłoby to 500 zapisów na sekundę. W przypadku średniego kosztu pięciu jednostek żądań (RU) na zapis tabela docelowa wymaga aprowizowanych co najmniej 2500 jednostek żądań (5 jednostek RU x 500 zapisów na sekundę).
Zwiększenie liczby funkcji wykonawczych może zwiększyć liczbę wątków w danym zadaniu, co z kolei może zwiększyć przepływność. Jednak dokładny wpływ tej metody może być zmienny w zależności od zadania, a kontrolowanie przepływności przy użyciu liczby procesów roboczych jest bardziej deterministyczne. Możesz również określić dokładny koszt danego żądania, profilowanie go w celu pobrania opłaty za jednostkę żądania (RU). Pomoże to być dokładniejsze podczas aprowizowania przepływności dla tabeli lub przestrzeni kluczy. Zapoznaj się z naszym artykułem tutaj , aby dowiedzieć się, jak uzyskać opłaty za jednostkę żądania na poziomie żądania.
Skalowanie przepływności w bazie danych
Łącznik Cassandra Spark wydajnie sytuuje przepływność w usłudze Azure Cosmos DB. W związku z tym nawet w przypadku skutecznych ponownych prób należy upewnić się, że masz wystarczającą przepływność (RU) aprowizowaną na poziomie tabeli lub przestrzeni kluczy, aby zapobiec ograniczaniu liczby powiązanych błędów. Minimalne ustawienie 400 jednostek RU w danej tabeli lub przestrzeni kluczy nie będzie wystarczające. Nawet przy minimalnych ustawieniach konfiguracji przepływności łącznik platformy Spark może zapisywać w tempie odpowiadającym około 6000 jednostkom żądań lub więcej.
Jeśli ustawienie jednostek RU wymagane do przenoszenia danych przy użyciu platformy Spark jest wyższe niż to, co jest wymagane dla obciążenia w stanie stałym, możesz łatwo skalować przepływność w górę i w dół w usłudze Azure Cosmos DB, aby zaspokoić potrzeby obciążenia w danym okresie. Przeczytaj nasz artykuł na temat elastycznej skali w interfejsie API dla rozwiązania Cassandra , aby zrozumieć różne opcje skalowania programowo i dynamicznie.
Uwaga
Powyższe wskazówki zakładają rozsądną jednolitą dystrybucję danych. Jeśli w danych występuje znaczna niesymetryczność (czyli zbyt duża liczba operacji odczytu/zapisu w tej samej wartości klucza partycji), nadal mogą wystąpić wąskie gardła, nawet jeśli w tabeli jest aprowizowana duża liczba jednostek żądań. Jednostki żądań są dzielone równomiernie między partycje fizyczne, a duże niesymetryczność danych może spowodować wąskie gardło żądań do pojedynczej partycji.
Parametry konfiguracji przepływności łącznika platformy Spark
W poniższej tabeli wymieniono parametry konfiguracji przepływności specyficzne dla usługi Azure Cosmos DB dla usługi Apache Cassandra udostępniane przez łącznik. Aby uzyskać szczegółową listę wszystkich parametrów konfiguracji, zobacz stronę referencyjną konfiguracji repozytorium GitHub łącznika Spark Cassandra.
Nazwa właściwości | Wartość domyślna | Opis |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | Liczba wierszy na pojedynczą partię. Ustaw ten parametr na wartość 1. Ten parametr służy do osiągnięcia wyższej przepływności w przypadku dużych obciążeń. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | Brak | Maksymalna liczba połączeń na węzeł na funkcję wykonawcza. 10*n jest równoważne 10 połączeniam na węzeł w klastrze Cassandra n-node. Dlatego jeśli potrzebujesz pięciu połączeń na węzeł na funkcję wykonawcza dla pięciu węzłów klastra Cassandra, należy ustawić tę konfigurację na 25. Zmodyfikuj tę wartość na podstawie stopnia równoległości lub liczby funkcji wykonawczych skonfigurowanych dla zadań platformy Spark. |
spark.cassandra.output.concurrent.writes | 100 | Definiuje liczbę zapisów równoległych, które mogą wystąpić dla funkcji wykonawczej. Ponieważ ustawiono wartość "batch.size.rows" na 1, pamiętaj, aby odpowiednio skalować tę wartość w górę. Zmodyfikuj tę wartość na podstawie stopnia równoległości lub przepływności, którą chcesz osiągnąć dla obciążenia. |
spark.cassandra.concurrent.reads | 512 | Definiuje liczbę operacji odczytu równoległego, które mogą wystąpić dla funkcji wykonawczej. Zmodyfikuj tę wartość na podstawie stopnia równoległości lub przepływności, którą chcesz osiągnąć dla obciążenia |
spark.cassandra.output.throughput_mb_per_sec | Brak | Definiuje łączną przepływność zapisu na funkcję wykonawczą. Ten parametr może służyć jako górny limit przepływności zadania platformy Spark i opierać go na aprowizowanej przepływności kontenera usługi Azure Cosmos DB. |
spark.cassandra.input.reads_per_sec | Brak | Definiuje łączną przepływność odczytu na funkcję wykonawcza. Ten parametr może służyć jako górny limit przepływności zadania platformy Spark i opierać go na aprowizowanej przepływności kontenera usługi Azure Cosmos DB. |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | Definiuje liczbę partii na jedno zadanie spark, które można przechowywać w pamięci przed wysłaniem do interfejsu API dla usługi Cassandra |
spark.cassandra.connection.keep_alive_ms | 60000 | Definiuje okres czasu, do którego są dostępne nieużywane połączenia. |
Dostosuj przepływność i stopień równoległości tych parametrów na podstawie oczekiwanego obciążenia dla zadań platformy Spark oraz przepływności aprowizowanej dla konta usługi Azure Cosmos DB.
Nawiązywanie połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark
cqlsh
Poniższe polecenia szczegółowo opisują sposób nawiązywania połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z poziomu narzędzia cqlsh. Jest to przydatne do weryfikacji podczas uruchamiania przykładów na platformie Spark.
Z systemu Linux/Unix/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
W poniższym artykule opisano aprowizowanie klastra usługi Azure Databricks, konfigurację klastra na potrzeby nawiązywania połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra oraz kilka przykładowych notesów, które obejmują operacje DDL, operacje DML i inne.
Praca z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z usługi Azure Databricks
2. Azure HDInsight-Spark
W poniższym artykule opisano usługę HDinsight-Spark, aprowizację, konfigurację klastra na potrzeby nawiązywania połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra oraz kilka przykładowych notesów, które obejmują operacje DDL, operacje DML i inne.
Praca z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z poziomu usługi Azure HDInsight-Spark
3. Środowisko platformy Spark ogólnie
Chociaż powyższe sekcje były specyficzne dla usług PaaS opartych na platformie Azure na platformie Azure, w tej sekcji opisano dowolne ogólne środowisko platformy Spark. Poniżej opisano zależności łącznika, importy i konfigurację sesji platformy Spark. Sekcja "Następne kroki" zawiera przykłady kodu dla operacji DDL, operacji DML i nie tylko.
Zależności łącznika:
- Dodawanie współrzędnych maven w celu pobrania łącznika Cassandra dla platformy Spark
- Dodawanie współrzędnych maven dla biblioteki pomocnika usługi Azure Cosmos DB dla interfejsu API dla bazy danych Cassandra
Przywozu:
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Konfiguracja sesji platformy Spark:
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Następne kroki
W poniższych artykułach przedstawiono integrację platformy Spark z usługą Azure Cosmos DB for Apache Cassandra.