Databáze dotazu používající JDBC
Azure Databricks podporuje připojení k externím databázím pomocí JDBC. Tento článek obsahuje základní syntaxi pro konfiguraci a použití těchto připojení s příklady v Pythonu, SQL a Scala.
Důležité
Konfigurace popsané v tomto článku jsou experimentální. Experimentální funkce jsou poskytovány tak, jak jsou, a Databricks je nepodporuje prostřednictvím technické podpory zákazníků. Pokud chcete získat plnou podporu federace dotazů, měli byste místo toho použít Lakehouse Federation, která uživatelům Azure Databricks umožňuje využívat syntaxi katalogu Unity a nástroje zásad správného řízení dat.
Partner Connect poskytuje optimalizované integrace pro synchronizaci dat s mnoha externími zdroji dat. Podívejte se, co je Databricks Partner Connect?
Důležité
Příklady v tomto článku nezahrnují uživatelská jména a hesla do adres URL JDBC. Databricks doporučuje používat tajné kódy k ukládání přihlašovacích údajů databáze. Příklad:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Pokud chcete odkazovat na tajné kódy Databricks s SQL, musíte během využití clusteru nakonfigurovat vlastnost konfigurace Sparku.
Úplný příklad správy tajných kódů najdete v tématu Kurz: Vytvoření a použití tajného kódu Databricks.
Čtení dat pomocí JDBC
Musíte nakonfigurovat řadu nastavení pro čtení dat pomocí JDBC. Všimněte si, že každá databáze používá jiný formát .<jdbc-url>
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark automaticky načte schéma z databázové tabulky a mapuje jeho typy zpět na typy Spark SQL.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Můžete spouštět dotazy na tuto tabulku JDBC:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Zápis dat pomocí JDBC
Ukládání dat do tabulek pomocí JDBC používá podobné konfigurace ke čtení. Prohlédněte si následující příklad:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Výchozí chování se pokusí vytvořit novou tabulku a vyvolá chybu, pokud tabulka s tímto názvem již existuje.
Data můžete k existující tabulce připojit pomocí následující syntaxe:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Existující tabulku můžete přepsat pomocí následující syntaxe:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Řízení paralelismu pro dotazy JDBC
Ovladač JDBC ve výchozím nastavení dotazuje zdrojovou databázi pouze s jedním vláknem. Pokud chcete zvýšit výkon čtení, musíte zadat řadu možností, které určují, kolik souběžných dotazů Azure Databricks provádí ve vaší databázi. U malých clusterů nastavíte numPartitions
možnost rovnající se počtu jader exekutoru v clusteru, aby všechny uzly dotazovávali data paralelně.
Upozorňující
Nastavení numPartitions
vysoké hodnoty ve velkém clusteru může mít za následek negativní výkon vzdálené databáze, protože příliš mnoho souběžných dotazů může službu zahltit. To je obzvláště problematické pro aplikační databáze. Buďte opatrní při nastavování této hodnoty nad 50.
Poznámka:
Urychlíte dotazy výběrem sloupce s indexem vypočítaným ve zdrojové databázi .partitionColumn
Následující příklad kódu ukazuje konfiguraci paralelismu pro cluster s osmi jádry:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Poznámka:
Azure Databricks podporuje všechny možnosti Apache Sparku pro konfiguraci JDBC.
Při zápisu do databází pomocí JDBC používá Apache Spark počet oddílů v paměti k řízení paralelismu. Před zápisem do správy paralelismu můžete data předělovat. Vyhněte se velkému počtu oddílů ve velkých clusterech, abyste se vyhnuli zahlcení vzdálené databáze. Následující příklad ukazuje dělení na osm oddílů před zápisem:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Nasdílení dotazu do databázového stroje
Do databáze můžete odeslat celý dotaz a vrátit jenom výsledek. Parametr table
identifikuje tabulku JDBC, která se má přečíst. V klauzuli dotazu FROM
SQL můžete použít cokoli, co je platné.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Řízení počtu načtených řádků na dotaz
Ovladače JDBC mají fetchSize
parametr, který řídí počet řádků načtených najednou ze vzdálené databáze.
Nastavení | Výsledek |
---|---|
Příliš nízká | Vysoká latence kvůli velkému počtu zaokrouhlení (několik řádků vrácených na dotaz) |
Příliš vysoká | Chyba nedostatku paměti (příliš mnoho dat vrácených v jednom dotazu) |
Optimální hodnota je závislá na úloze. Mezi důležité informace patří:
- Kolik sloupců dotaz vrátí?
- Jaké datové typy se vrátí?
- Jak dlouho se vrátí řetězce v jednotlivých sloupcích?
Systémy můžou mít velmi malé výchozí nastavení a těžit z ladění. Například: Výchozí hodnota fetchSize
Oracle je 10. Když ho zvýšíte na 100, sníží se celkový počet dotazů, které je potřeba provést faktorem 10. Výsledky JDBC jsou síťový provoz, takže vyhněte se velmi velkým číslům, ale optimální hodnoty můžou být v tisících pro mnoho datových sad.
fetchSize
Použijte možnost, jak je znázorněno v následujícím příkladu:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()