Migrowanie danych na żywo z bazy danych Apache Cassandra do usługi Azure Cosmos DB dla systemu Apache Cassandra przy użyciu serwera proxy z podwójnym zapisem i platformy Apache Spark
Interfejs API dla bazy danych Cassandra w usłudze Azure Cosmos DB stał się doskonałym wyborem dla obciążeń przedsiębiorstwa działających na platformie Apache Cassandra z różnych powodów, takich jak:
Brak obciążeń związanych z zarządzaniem i monitorowaniem: eliminuje obciążenie związane z zarządzaniem i monitorowaniem niezliczonych ustawień w systemach operacyjnych, JVM i plikach yaml oraz ich interakcjach.
Znaczne oszczędności kosztów: możesz zaoszczędzić koszty dzięki usłudze Azure Cosmos DB, która obejmuje koszty maszyn wirtualnych, przepustowości i wszelkich odpowiednich licencji. Ponadto nie trzeba zarządzać centrami danych, serwerami, magazynem SSD, siecią i kosztami energii elektrycznej.
Możliwość używania istniejącego kodu i narzędzi: usługa Azure Cosmos DB udostępnia zgodność na poziomie protokołu przewodowego z istniejącymi zestawami SDK i narzędziami platformy Cassandra. Ta zgodność zapewnia możliwość użycia istniejącej bazy kodu z usługą Azure Cosmos DB dla systemu Apache Cassandra z prostymi zmianami.
Usługa Azure Cosmos DB nie obsługuje natywnego protokołu plotek apache Cassandra na potrzeby replikacji. W związku z tym w przypadku, gdy do migracji wymagana jest zerowa przestój, konieczne jest inne podejście. W tym samouczku opisano sposób migrowania danych na żywo do usługi Azure Cosmos DB for Apache Cassandra z natywnego klastra Apache Cassandra przy użyciu serwera proxy z podwójnym zapisem i platformy Apache Spark.
Na poniższej ilustracji przedstawiono wzorzec. Serwer proxy z podwójnym zapisem służy do przechwytywania zmian na żywo, podczas gdy dane historyczne są kopiowane zbiorczo przy użyciu platformy Apache Spark. Serwer proxy może akceptować połączenia z kodu aplikacji z kilkoma zmianami konfiguracji. Spowoduje to kierowanie wszystkich żądań do źródłowej bazy danych i asynchroniczne kierowanie zapisów do interfejsu API dla bazy danych Cassandra podczas wykonywania kopiowania zbiorczego.
Wymagania wstępne
Aprowizuj konto usługi Azure Cosmos DB dla bazy danych Apache Cassandra.
Zapoznaj się z obsługiwanymi funkcjami w usłudze Azure Cosmos DB dla systemu Apache Cassandra , aby zapewnić zgodność.
Upewnij się, że masz łączność sieciową między klastrem źródłowym i docelowym interfejsem API dla punktu końcowego cassandra.
Upewnij się, że przeprowadzono już migrację schematu przestrzeni kluczy/tabeli ze źródłowej bazy danych Cassandra do docelowego interfejsu API dla konta Cassandra.
Ważne
Jeśli podczas migracji wymagane jest zachowanie bazy danych Apache Cassandra
writetime
, podczas tworzenia tabel należy ustawić następujące flagi:with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true
Na przykład:
CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
CREATE TABLE IF NOT EXISTS migrationkeyspace.users ( name text, userID int, address text, phone int, PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;
Aprowizuj klaster Spark
Zalecamy usługę Azure Databricks. Użyj środowiska uruchomieniowego obsługującego platformę Spark 3.0 lub nowszą.
Ważne
Upewnij się, że konto usługi Azure Databricks ma łączność sieciową ze źródłowym klastrem Apache Cassandra. Może to wymagać wstrzyknięcia sieci wirtualnej. Aby uzyskać więcej informacji, zobacz artykuł tutaj .
Dodawanie zależności platformy Spark
Musisz dodać bibliotekę łącznika Apache Spark Cassandra connector do klastra, aby nawiązać połączenie zarówno z punktami końcowymi macierzystymi, jak i z punktami końcowymi Cassandra usługi Azure Cosmos DB. W klastrze wybierz pozycję Biblioteki>Zainstaluj nowe>narzędzie Maven, a następnie dodaj com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0
współrzędne narzędzia Maven.
Ważne
Jeśli wymagana jest ochrona bazy danych Apache Cassandra writetime
dla każdego wiersza podczas migracji, zalecamy użycie tego przykładu. Plik jar zależności w tym przykładzie zawiera również łącznik Spark, dlatego należy zainstalować go zamiast zestawu łącznika powyżej. Ten przykład jest również przydatny, jeśli chcesz przeprowadzić walidację porównania wierszy między źródłem i obiektem docelowym po zakończeniu ładowania danych historycznych. Zobacz sekcje "uruchamianie ładowania danych historycznych" i "weryfikowanie źródła i celu" poniżej, aby uzyskać więcej informacji.
Wybierz pozycję Zainstaluj, a następnie uruchom ponownie klaster po zakończeniu instalacji.
Uwaga
Pamiętaj, aby ponownie uruchomić klaster usługi Azure Databricks po zainstalowaniu biblioteki łącznika Cassandra.
Instalowanie serwera proxy z podwójnym zapisem
Aby uzyskać optymalną wydajność podczas podwójnych zapisów, zalecamy zainstalowanie serwera proxy we wszystkich węzłach w źródłowym klastrze Cassandra.
#assuming you do not have git already installed
sudo apt-get install git
#assuming you do not have maven already installed
sudo apt install maven
#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git
#change directory
cd cassandra-proxy
#compile the proxy
mvn package
Uruchamianie serwera proxy z podwójnym zapisem
Zalecamy zainstalowanie serwera proxy na wszystkich węzłach w źródłowym klastrze Cassandra. Uruchom co najmniej następujące polecenie, aby uruchomić serwer proxy w każdym węźle. Zastąp <target-server>
ciąg adresem IP lub adresem serwera z jednego z węzłów w klastrze docelowym. Zastąp ciąg ścieżką do lokalnego pliku jks i zastąp <path to JKS file>
<keystore password>
ciąg odpowiednim hasłem.
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>
Uruchomienie serwera proxy w ten sposób zakłada, że spełnione są następujące warunki:
- Źródłowe i docelowe punkty końcowe mają taką samą nazwę użytkownika i hasło.
- Źródłowe i docelowe punkty końcowe implementują protokół SECURE Sockets Layer (SSL).
Jeśli źródłowe i docelowe punkty końcowe nie mogą spełnić tych kryteriów, zapoznaj się z tematem dalsze opcje konfiguracji.
Konfigurowanie protokołu SSL
W przypadku protokołu SSL można zaimplementować istniejący magazyn kluczy (na przykład używany przez klaster źródłowy) lub utworzyć certyfikat z podpisem własnym przy użyciu polecenia keytool
:
keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048
Możesz również wyłączyć protokół SSL dla źródłowych lub docelowych punktów końcowych, jeśli nie implementują protokołu SSL. --disable-source-tls
Użyj flag lub--disable-target-tls
:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password> --disable-source-tls true --disable-target-tls true
Uwaga
Podczas tworzenia połączeń SSL z bazą danych za pośrednictwem serwera proxy upewnij się, że aplikacja kliencka używa tego samego magazynu kluczy i hasła, co dla serwera proxy.
Konfigurowanie poświadczeń i portów
Domyślnie poświadczenia źródłowe będą przekazywane z aplikacji klienckiej. Serwer proxy będzie używać poświadczeń do nawiązywania połączeń z klastrami źródłowymi i docelowymi. Jak wspomniano wcześniej, ten proces zakłada, że poświadczenia źródłowe i docelowe są takie same. Konieczne będzie oddzielne określenie innej nazwy użytkownika i hasła docelowego interfejsu API dla punktu końcowego Cassandra podczas uruchamiania serwera proxy:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>
Domyślne porty źródłowe i docelowe, jeśli nie zostaną określone, będą mieć wartość 9042. W takim przypadku interfejs API dla bazy danych Cassandra działa na porcie 10350
, więc musisz użyć --source-port
polecenia lub --target-port
określić numery portów:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> --target-username <username> --target-password <password>
Zdalne wdrażanie serwera proxy
Mogą wystąpić okoliczności, w których nie chcesz instalować serwera proxy na samych węzłach klastra i wolisz zainstalować go na osobnej maszynie. W tym scenariuszu należy określić adres IP :<source-server>
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>
Ostrzeżenie
Zainstalowanie i zdalne uruchomienie serwera proxy na osobnej maszynie (zamiast uruchamiania go na wszystkich węzłach w źródłowym klastrze Apache Cassandra) będzie miało wpływ na wydajność podczas migracji na żywo. Chociaż będzie działać funkcjonalnie, sterownik klienta nie będzie mógł otwierać połączeń ze wszystkimi węzłami w klastrze i będzie polegać na jednym węźle koordynowania (gdzie jest zainstalowany serwer proxy) do nawiązywania połączeń.
Zezwalaj na zerowe zmiany kodu aplikacji
Domyślnie serwer proxy nasłuchuje na porcie 29042. Kod aplikacji należy zmienić tak, aby wskazywał ten port. Można jednak zmienić port, na którym nasłuchuje serwer proxy. Możesz to zrobić, jeśli chcesz wyeliminować zmiany kodu na poziomie aplikacji przez:
- Po uruchomieniu źródłowego serwera Cassandra na innym porcie.
- Uruchomienie serwera proxy na standardowym porcie Cassandra 9042.
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042
Uwaga
Instalowanie serwera proxy w węzłach klastra nie wymaga ponownego uruchomienia węzłów. Jeśli jednak masz wielu klientów aplikacji i wolisz mieć serwer proxy uruchomiony na standardowym porcie Cassandra 9042, aby wyeliminować wszelkie zmiany kodu na poziomie aplikacji, musisz zmienić domyślny port apache Cassandra. Następnie należy ponownie uruchomić węzły w klastrze i skonfigurować port źródłowy jako nowy port zdefiniowany dla źródłowego klastra Cassandra.
W poniższym przykładzie zmienimy źródłowy klaster Cassandra na uruchomiony na porcie 3074 i uruchomimy klaster na porcie 9042:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042 --source-port 3074
Wymuszanie protokołów
Serwer proxy ma funkcje wymuszania protokołów, które mogą być konieczne, jeśli źródłowy punkt końcowy jest bardziej zaawansowany niż docelowy lub w inny sposób nie jest obsługiwany. W takim przypadku można określić --protocol-version
i --cql-version
wymusić, aby protokół był zgodny z celem:
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --protocol-version 4 --cql-version 3.11
Po uruchomieniu serwera proxy z podwójnym zapisem należy zmienić port na kliencie aplikacji i uruchomić go ponownie. (Lub zmień port Cassandra i uruchom ponownie klaster, jeśli wybrano to podejście). Następnie serwer proxy rozpocznie przekazywanie zapisów do docelowego punktu końcowego. Informacje o monitorowaniu i metrykach dostępnych w narzędziu serwera proxy.
Uruchamianie ładowania danych historycznych
Aby załadować dane, utwórz notes Języka Scala na koncie usługi Azure Databricks. Zastąp źródłowe i docelowe konfiguracje bazy danych Cassandra odpowiednimi poświadczeniami, a następnie zastąp źródłowe i docelowe przestrzenie kluczy i tabele. Dodaj więcej zmiennych dla każdej tabeli zgodnie z wymaganiami do poniższego przykładu, a następnie uruchom polecenie . Gdy aplikacja zacznie wysyłać żądania do serwera proxy z podwójnym zapisem, możesz przystąpić do migracji danych historycznych.
Ważne
Przed migracją danych zwiększ przepływność kontenera do ilości wymaganej do szybkiej migracji aplikacji. Skalowanie przepływności przed rozpoczęciem migracji ułatwi migrowanie danych w krótszym czasie. Aby ułatwić ochronę przed ograniczaniem szybkości podczas ładowania danych historycznych, możesz włączyć ponawianie prób po stronie serwera (SSR) w interfejsie API dla bazy danych Cassandra. Zobacz nasz artykuł tutaj , aby uzyskać więcej informacji i instrukcje dotyczące włączania usługi SSR.
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
// source cassandra configs
val sourceCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>"
)
//target cassandra configs
val targetCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "10350",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>",
//throughput related settings below - tweak these depending on data volumes.
"spark.cassandra.output.batch.size.rows"-> "1",
"spark.cassandra.output.concurrent.writes" -> "1000",
"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
"spark.cassandra.concurrent.reads" -> "512",
"spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
"spark.cassandra.connection.keep_alive_ms" -> "600000000"
)
//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000
//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(sourceCassandra)
.load
//Write to target Cassandra
DFfromSourceCassandra
.write
.format("org.apache.spark.sql.cassandra")
.options(targetCassandra)
.option("writetime", timestamp)
.mode(SaveMode.Append)
.save
Uwaga
W poprzednim przykładzie Scala zauważysz, że timestamp
jest ustawiana na bieżący czas przed odczytaniem wszystkich danych w tabeli źródłowej. writetime
Następnie jest ustawiana na tę sygnaturę czasową wsteczną. Gwarantuje to, że rekordy zapisywane z ładowania danych historycznych do docelowego punktu końcowego nie mogą zastąpić aktualizacji, które pochodzą z późniejszego sygnatury czasowej z serwera proxy z podwójnym zapisem, podczas gdy dane historyczne są odczytywane.
Ważne
Jeśli musisz zachować dokładne sygnatury czasowe z jakiegokolwiek powodu, należy podjąć podejście do migracji danych historycznych, które zachowuje sygnatury czasowe, takie jak ten przykład. Plik jar zależności w przykładzie zawiera również łącznik Spark, więc nie trzeba instalować zestawu łącznika Spark wymienionego we wcześniejszych wymaganiach wstępnych — po zainstalowaniu obu w klastrze Spark wystąpią konflikty.
Weryfikowanie źródła i miejsca docelowego
Po zakończeniu ładowania danych historycznych bazy danych powinny być zsynchronizowane i gotowe do migracji jednorazowej. Zalecamy jednak zweryfikowanie źródła i celu, aby upewnić się, że są one zgodne przed zakończeniem cięcia.
Uwaga
Jeśli użyto przykładu cassandra migrator wymienionego powyżej w celu zachowania writetime
wartości , obejmuje to możliwość weryfikacji migracji przez porównanie wierszy w źródle i obiekcie docelowym na podstawie pewnych tolerancji.