Udostępnij za pośrednictwem


Nawiązywanie połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark

DOTYCZY: Kasandra

Ten artykuł jest jednym z serii artykułów na temat integracji usługi Azure Cosmos DB for Apache Cassandra z platformy Spark. Artykuły obejmują operacje łączności, operacje języka definicji danych (DDL), podstawowe operacje języka manipulowania danymi (DML) oraz zaawansowaną integrację usługi Azure Cosmos DB z platformą Apache Cassandra z platformy Spark.

Wymagania wstępne

Zależności dotyczące łączności

  • Łącznik Platformy Spark dla rozwiązania Cassandra: łącznik spark służy do nawiązywania połączenia z usługą Azure Cosmos DB dla usługi Apache Cassandra. Zidentyfikuj i użyj wersji łącznika znajdującego się w centrum programu Maven, która jest zgodna z wersjami spark i Scala środowiska Spark. Zalecamy środowisko obsługujące platformę Spark w wersji 3.2.1 lub nowszej, a łącznik spark dostępny na współrzędnych com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0maven . Jeśli używasz platformy Spark 2.x, zalecamy środowisko z platformą Spark w wersji 2.4.5 przy użyciu łącznika spark na współrzędnych com.datastax.spark:spark-cassandra-connector_2.11:2.4.3narzędzia Maven.

  • Biblioteka pomocnika usługi Azure Cosmos DB dla interfejsu API dla rozwiązania Cassandra: jeśli używasz wersji Spark 2.x, oprócz łącznika Spark potrzebujesz innej biblioteki o nazwie azure-cosmos-cassandra-spark-helper z współrzędnymi com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 maven z usługi Azure Cosmos DB w celu obsługi ograniczania szybkości. Ta biblioteka zawiera niestandardową fabrykę połączeń i klasy zasad ponawiania prób.

    Zasady ponawiania prób w usłudze Azure Cosmos DB są skonfigurowane do obsługi wyjątków kodu stanu HTTP 429 ("Żądaj dużej liczby żądań"). Usługa Azure Cosmos DB dla systemu Apache Cassandra tłumaczy te wyjątki na przeciążone błędy w natywnym protokole Cassandra i można ponowić próbę z wycofywaniem. Ponieważ usługa Azure Cosmos DB używa modelu aprowizowanej przepływności, wyjątki ograniczania szybkości żądań występują po wzroście szybkości ruchu przychodzącego/wychodzącego. Zasady ponawiania prób chronią zadania platformy Spark przed skokami danych, które chwilowo przekraczają przepływność przydzieloną dla kontenera. W przypadku korzystania z łącznika Spark 3.x implementacja tej biblioteki nie jest wymagana.

    Uwaga

    Zasady ponawiania prób mogą chronić zadania platformy Spark tylko przed nagłymi skokami. Jeśli nie skonfigurowano wystarczającej liczby jednostek RU wymaganych do uruchomienia obciążenia, zasady ponawiania nie mają zastosowania, a klasa zasad ponawiania spowoduje ponowne wywołanie wyjątku.

  • Szczegóły połączenia konta usługi Azure Cosmos DB: Twój interfejs API platformy Azure dla nazwy konta Cassandra, punktu końcowego konta i klucza.

Optymalizowanie konfiguracji przepływności łącznika platformy Spark

Wymienione w następnej sekcji są wszystkimi odpowiednimi parametrami do kontrolowania przepływności przy użyciu łącznika Spark dla bazy danych Cassandra. Aby zoptymalizować parametry w celu zmaksymalizowania przepływności zadań platformy Spark, spark.cassandra.output.concurrent.writeskonfiguracje , spark.cassandra.concurrent.readsi spark.cassandra.input.reads_per_sec muszą być prawidłowo skonfigurowane, aby uniknąć zbyt dużej przepustowości i wycofywania (co z kolei może prowadzić do obniżenia przepływności).

Optymalna wartość tych konfiguracji zależy od czterech czynników:

  • Ilość przepływności (jednostek żądań) skonfigurowana dla tabeli, w ramach których są pozyskiwane dane.
  • Liczba procesów roboczych w klastrze Spark.
  • Liczba funkcji wykonawczych skonfigurowanych dla zadania platformy Spark (które można kontrolować przy użyciu spark.cassandra.connection.connections_per_executor_max lub spark.cassandra.connection.remoteConnectionsPerExecutor w zależności od wersji platformy Spark)
  • Średnie opóźnienie każdego żądania w usłudze Azure Cosmos DB, jeśli jest sortowane w tym samym centrum danych. Załóżmy, że ta wartość to 10 ms dla operacji zapisu i 3 ms dla operacji odczytu.

Jeśli na przykład mamy pięć procesów roboczych i wartość spark.cassandra.output.concurrent.writes= 1, a wartość spark.cassandra.connection.remoteConnectionsPerExecutor = 1, mamy pięć procesów roboczych, które jednocześnie zapisują się w tabeli, z których każdy ma jeden wątek. Jeśli wykonanie pojedynczego zapisu zajmuje 10 ms, możemy wysłać 100 żądań (1000 milisekund podzielonych przez 10) na sekundę, na wątek. W przypadku pięciu procesów roboczych byłoby to 500 zapisów na sekundę. W przypadku średniego kosztu pięciu jednostek żądań (RU) na zapis tabela docelowa wymaga aprowizowanych co najmniej 2500 jednostek żądań (5 jednostek RU x 500 zapisów na sekundę).

Zwiększenie liczby funkcji wykonawczych może zwiększyć liczbę wątków w danym zadaniu, co z kolei może zwiększyć przepływność. Jednak dokładny wpływ tej metody może być zmienny w zależności od zadania, a kontrolowanie przepływności przy użyciu liczby procesów roboczych jest bardziej deterministyczne. Możesz również określić dokładny koszt danego żądania, profilowanie go w celu pobrania opłaty za jednostkę żądania (RU). Pomoże to być dokładniejsze podczas aprowizowania przepływności dla tabeli lub przestrzeni kluczy. Zapoznaj się z naszym artykułem tutaj , aby dowiedzieć się, jak uzyskać opłaty za jednostkę żądania na poziomie żądania.

Skalowanie przepływności w bazie danych

Łącznik Cassandra Spark wydajnie sytuuje przepływność w usłudze Azure Cosmos DB. W związku z tym nawet w przypadku skutecznych ponownych prób należy upewnić się, że masz wystarczającą przepływność (RU) aprowizowaną na poziomie tabeli lub przestrzeni kluczy, aby zapobiec ograniczaniu liczby powiązanych błędów. Minimalne ustawienie 400 jednostek RU w danej tabeli lub przestrzeni kluczy nie będzie wystarczające. Nawet przy minimalnych ustawieniach konfiguracji przepływności łącznik platformy Spark może zapisywać w tempie odpowiadającym około 6000 jednostkom żądań lub więcej.

Jeśli ustawienie jednostek RU wymagane do przenoszenia danych przy użyciu platformy Spark jest wyższe niż to, co jest wymagane dla obciążenia w stanie stałym, możesz łatwo skalować przepływność w górę i w dół w usłudze Azure Cosmos DB, aby zaspokoić potrzeby obciążenia w danym okresie. Przeczytaj nasz artykuł na temat elastycznej skali w interfejsie API dla rozwiązania Cassandra , aby zrozumieć różne opcje skalowania programowo i dynamicznie.

Uwaga

Powyższe wskazówki zakładają rozsądną jednolitą dystrybucję danych. Jeśli w danych występuje znaczna niesymetryczność (czyli zbyt duża liczba operacji odczytu/zapisu w tej samej wartości klucza partycji), nadal mogą wystąpić wąskie gardła, nawet jeśli w tabeli jest aprowizowana duża liczba jednostek żądań. Jednostki żądań są dzielone równomiernie między partycje fizyczne, a duże niesymetryczność danych może spowodować wąskie gardło żądań do pojedynczej partycji.

Parametry konfiguracji przepływności łącznika platformy Spark

W poniższej tabeli wymieniono parametry konfiguracji przepływności specyficzne dla usługi Azure Cosmos DB dla usługi Apache Cassandra udostępniane przez łącznik. Aby uzyskać szczegółową listę wszystkich parametrów konfiguracji, zobacz stronę referencyjną konfiguracji repozytorium GitHub łącznika Spark Cassandra.

Nazwa właściwości Wartość domyślna Opis
spark.cassandra.output.batch.size.rows 1 Liczba wierszy na pojedynczą partię. Ustaw ten parametr na wartość 1. Ten parametr służy do osiągnięcia wyższej przepływności w przypadku dużych obciążeń.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Brak Maksymalna liczba połączeń na węzeł na funkcję wykonawcza. 10*n jest równoważne 10 połączeniam na węzeł w klastrze Cassandra n-node. Dlatego jeśli potrzebujesz pięciu połączeń na węzeł na funkcję wykonawcza dla pięciu węzłów klastra Cassandra, należy ustawić tę konfigurację na 25. Zmodyfikuj tę wartość na podstawie stopnia równoległości lub liczby funkcji wykonawczych skonfigurowanych dla zadań platformy Spark.
spark.cassandra.output.concurrent.writes 100 Definiuje liczbę zapisów równoległych, które mogą wystąpić dla funkcji wykonawczej. Ponieważ ustawiono wartość "batch.size.rows" na 1, pamiętaj, aby odpowiednio skalować tę wartość w górę. Zmodyfikuj tę wartość na podstawie stopnia równoległości lub przepływności, którą chcesz osiągnąć dla obciążenia.
spark.cassandra.concurrent.reads 512 Definiuje liczbę operacji odczytu równoległego, które mogą wystąpić dla funkcji wykonawczej. Zmodyfikuj tę wartość na podstawie stopnia równoległości lub przepływności, którą chcesz osiągnąć dla obciążenia
spark.cassandra.output.throughput_mb_per_sec Brak Definiuje łączną przepływność zapisu na funkcję wykonawczą. Ten parametr może służyć jako górny limit przepływności zadania platformy Spark i opierać go na aprowizowanej przepływności kontenera usługi Azure Cosmos DB.
spark.cassandra.input.reads_per_sec Brak Definiuje łączną przepływność odczytu na funkcję wykonawcza. Ten parametr może służyć jako górny limit przepływności zadania platformy Spark i opierać go na aprowizowanej przepływności kontenera usługi Azure Cosmos DB.
spark.cassandra.output.batch.grouping.buffer.size 1000 Definiuje liczbę partii na jedno zadanie spark, które można przechowywać w pamięci przed wysłaniem do interfejsu API dla usługi Cassandra
spark.cassandra.connection.keep_alive_ms 60000 Definiuje okres czasu, do którego są dostępne nieużywane połączenia.

Dostosuj przepływność i stopień równoległości tych parametrów na podstawie oczekiwanego obciążenia dla zadań platformy Spark oraz przepływności aprowizowanej dla konta usługi Azure Cosmos DB.

Nawiązywanie połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark

cqlsh

Poniższe polecenia szczegółowo opisują sposób nawiązywania połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z poziomu narzędzia cqlsh. Jest to przydatne do weryfikacji podczas uruchamiania przykładów na platformie Spark.
Z systemu Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

W poniższym artykule opisano aprowizowanie klastra usługi Azure Databricks, konfigurację klastra na potrzeby nawiązywania połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra oraz kilka przykładowych notesów, które obejmują operacje DDL, operacje DML i inne.
Praca z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z usługi Azure Databricks

2. Azure HDInsight-Spark

W poniższym artykule opisano usługę HDinsight-Spark, aprowizację, konfigurację klastra na potrzeby nawiązywania połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra oraz kilka przykładowych notesów, które obejmują operacje DDL, operacje DML i inne.
Praca z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z poziomu usługi Azure HDInsight-Spark

3. Środowisko platformy Spark ogólnie

Chociaż powyższe sekcje były specyficzne dla usług PaaS opartych na platformie Azure na platformie Azure, w tej sekcji opisano dowolne ogólne środowisko platformy Spark. Poniżej opisano zależności łącznika, importy i konfigurację sesji platformy Spark. Sekcja "Następne kroki" zawiera przykłady kodu dla operacji DDL, operacji DML i nie tylko.

Zależności łącznika:

  1. Dodawanie współrzędnych maven w celu pobrania łącznika Cassandra dla platformy Spark
  2. Dodawanie współrzędnych maven dla biblioteki pomocnika usługi Azure Cosmos DB dla interfejsu API dla bazy danych Cassandra

Przywozu:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Konfiguracja sesji platformy Spark:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Następne kroki

W poniższych artykułach przedstawiono integrację platformy Spark z usługą Azure Cosmos DB for Apache Cassandra.