Udostępnij za pośrednictwem


Migrowanie danych z bazy danych Cassandra do konta usługi Azure Cosmos DB dla bazy danych Apache Cassandra przy użyciu usługi Azure Databricks

DOTYCZY: Kasandra

Interfejs API dla bazy danych Cassandra w usłudze Azure Cosmos DB stał się doskonałym wyborem dla obciążeń przedsiębiorstwa działających na platformie Apache Cassandra z kilku powodów:

  • Brak obciążeń związanych z zarządzaniem i monitorowaniem: eliminuje obciążenie związane z zarządzaniem ustawieniami systemu operacyjnego, JVM i YAML oraz ich interakcjami.

  • Znaczne oszczędności kosztów: możesz zaoszczędzić koszty dzięki usłudze Azure Cosmos DB, która obejmuje koszty maszyn wirtualnych, przepustowości i wszelkich odpowiednich licencji. Nie musisz zarządzać centrami danych, serwerami, magazynem SSD, siecią i kosztami energii elektrycznej.

  • Możliwość korzystania z istniejącego kodu i narzędzi: usługa Azure Cosmos DB zapewnia zgodność na poziomie protokołu przewodowego z istniejącymi zestawami SDK i narzędziami Cassandra. Ta zgodność zapewnia możliwość korzystania z istniejącej bazy kodu z usługą Azure Cosmos DB dla systemu Apache Cassandra z prostymi zmianami.

Istnieje wiele sposobów migrowania obciążeń bazy danych z jednej platformy do innej. Azure Databricks to oferta platformy jako usługi (PaaS) dla platformy Apache Spark , która oferuje sposób przeprowadzania migracji w trybie offline na dużą skalę. W tym artykule opisano kroki wymagane do migracji danych z natywnych przestrzeni kluczy i tabel apache Cassandra do usługi Azure Cosmos DB for Apache Cassandra przy użyciu usługi Azure Databricks.

Wymagania wstępne

Aprowizuj klaster usługi Azure Databricks

Aby aprowizować klaster usługi Azure Databricks, możesz postępować zgodnie z instrukcjami. Zalecamy wybranie środowiska Uruchomieniowego usługi Databricks w wersji 7.5, która obsługuje platformę Spark 3.0.

Zrzut ekranu przedstawiający znajdowanie wersji środowiska uruchomieniowego usługi Databricks.

Dodawanie zależności

Musisz dodać bibliotekę łącznika Apache Spark Cassandra connector do klastra, aby nawiązać połączenie zarówno z punktami końcowymi macierzystymi, jak i z punktami końcowymi Cassandra usługi Azure Cosmos DB. W klastrze wybierz pozycję Biblioteki>Zainstaluj nowe>narzędzie Maven, a następnie dodaj com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 współrzędne narzędzia Maven.

Zrzut ekranu przedstawiający wyszukiwanie pakietów Maven w usłudze Databricks.

Wybierz pozycję Zainstaluj, a następnie uruchom ponownie klaster po zakończeniu instalacji.

Uwaga

Upewnij się, że klaster usługi Databricks został uruchomiony ponownie po zainstalowaniu biblioteki łącznika Cassandra.

Ostrzeżenie

Przykłady pokazane w tym artykule zostały przetestowane przy użyciu platformy Spark w wersji 3.0.1 i odpowiadającego mu łącznika Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0. Nowsze wersje platformy Spark i/lub łącznika Cassandra mogą nie działać zgodnie z oczekiwaniami.

Tworzenie notesu Scala na potrzeby migracji

Tworzenie notesu Scala w usłudze Databricks. Zastąp źródłowe i docelowe konfiguracje bazy danych Cassandra odpowiednimi poświadczeniami oraz źródłowymi i docelowymi przestrzeniami kluczy i tabelami. Następnie uruchom następujący kod:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val nativeCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val cosmosCassandra = Map( 
    "spark.cassandra.connection.host" -> "<USERNAME>.cassandra.cosmos.azure.com",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    //"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1", // Spark 3.x
    "spark.cassandra.connection.connections_per_executor_max"-> "1", // Spark 2.x
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from native Cassandra
val DFfromNativeCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(nativeCassandra)
  .load
  
//Write to CosmosCassandra
DFfromNativeCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(cosmosCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Uwaga

Wartości spark.cassandra.output.batch.size.rows i i spark.cassandra.output.concurrent.writes oraz liczba procesów roboczych w klastrze Spark są ważnymi konfiguracjami, które należy dostosować, aby uniknąć ograniczania szybkości. Ograniczanie szybkości występuje, gdy żądania do usługi Azure Cosmos DB przekraczają aprowizowaną przepływność lub jednostki żądań (RU). Może być konieczne dostosowanie tych ustawień w zależności od liczby funkcji wykonawczych w klastrze Spark i potencjalnie rozmiaru (a w związku z tym kosztu jednostek RU) każdego rekordu zapisywanego w tabelach docelowych.

Rozwiązywanie problemów

Ograniczanie szybkości (błąd 429)

Może zostać wyświetlony kod błędu 429 lub tekst błędu "Częstotliwość żądań jest duży", nawet jeśli ustawienia są ograniczone do ich minimalnych wartości. Następujące scenariusze mogą spowodować ograniczenie szybkości:

  • Przepływność przydzielona do tabeli jest mniejsza niż 6000 jednostek żądań. Nawet w przypadku minimalnych ustawień platforma Spark może zapisywać w tempie około 6000 jednostek żądań lub więcej. Jeśli aprowizujesz tabelę w przestrzeni kluczy z udostępnioną przepływnością, możliwe, że ta tabela ma mniej niż 6000 jednostek RU dostępnych w czasie wykonywania.

    Upewnij się, że tabela, do której przeprowadzasz migrację, ma co najmniej 6000 jednostek RU dostępnych podczas uruchamiania migracji. W razie potrzeby przydziel dedykowane jednostki żądań do tej tabeli.

  • Nadmierne niesymetryczność danych z dużą ilością danych. Jeśli masz dużą ilość danych do przeprowadzenia migracji do danej tabeli, ale istnieje znaczna niesymetryczność danych (czyli duża liczba rekordów zapisywanych dla tej samej wartości klucza partycji), nadal może wystąpić ograniczenie szybkości, nawet jeśli w tabeli jest aprowizowanych kilka jednostek żądań . Jednostki żądań są dzielone równomiernie między partycje fizyczne, a duże niesymetryczność danych może spowodować wąskie gardło żądań do pojedynczej partycji.

    W tym scenariuszu zmniejsz do minimalnych ustawień przepływności na platformie Spark i wymuś powolne działanie migracji. Ten scenariusz może być bardziej typowy podczas migrowania tabel odwołań lub kontrolek, gdzie dostęp jest rzadziej i może być wysoki. Jeśli jednak istnieje znaczna niesymetryczność w dowolnej innej tabeli, warto przejrzeć model danych, aby uniknąć problemów z gorącą partycją dla obciążenia podczas operacji w stanie stałym.

Następne kroki