Consulta de bases de datos con JDBC
Azure Databricks admite la conexión a bases de datos externas mediante JDBC. En este artículo se proporciona la sintaxis básica para configurar y usar estas conexiones con ejemplos de Python, SQL y Scala.
Importante
Las configuraciones descritas en este artículo son experimentales. Las características experimentales se proporcionan tal cual y no cuentan con soporte técnico de Databricks a través del soporte técnico al cliente. Para obtener compatibilidad completa con la federación de consultas, debe usar la federación de Lakehouse, que permite a los usuarios de Azure Databricks aprovechar la sintaxis de Unity Catalog y las herramientas de gobernanza de datos.
Partner Connect proporciona integraciones optimizadas para sincronizar datos con muchos orígenes de datos externos. Consulte ¿Qué es Databricks Partner Connect?
Importante
Los ejemplos de este artículo no incluyen nombres de usuario y contraseñas en las direcciones URL de JDBC. Databricks recomienda usar secretos para almacenar las credenciales de la base de datos. Por ejemplo:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Para hacer referencia a secretos de Databricks con SQL, debe configurar una propiedad de configuración de Spark durante la inicialización del clúster.
Para obtener un ejemplo completo de administración de secretos, consulte Tutorial: Creación y uso de un secreto de Databricks.
Lectura de datos con JDBC
Debe configurar una serie de opciones para leer datos mediante JDBC. Tenga en cuenta que cada base de datos usa un formato diferente para <jdbc-url>
.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark lee automáticamente el esquema de la tabla de base de datos y asigna sus tipos de nuevo a los tipos de Spark SQL.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Puede ejecutar consultas en esta tabla JDBC:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Escritura de datos con JDBC
Para guardar datos en tablas con JDBC, se usan configuraciones similares a las que se emplean para leer. Observe el ejemplo siguiente:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
El comportamiento predeterminado intenta crear una tabla y genera un error si ya existe una tabla con ese nombre.
Puede anexar datos a una tabla existente mediante la sintaxis siguiente:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Puede sobrescribir una tabla existente mediante la sintaxis siguiente:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Control del paralelismo para las consultas de JDBC
De forma predeterminada, el controlador de JDBC consulta la base de datos de origen con un único subproceso. Para mejorar el rendimiento de las lecturas, debe especificar una serie de opciones para controlar cuántas consultas simultáneas realiza Azure Databricks en la base de datos. En el caso de los clústeres pequeños, si establece la opción numPartitions
en el número de núcleos de los ejecutores del clúster, garantizará que todos los nodos consulten los datos en paralelo.
Advertencia
Si establece numPartitions
en un valor alto en un clúster grande, puede producirse un rendimiento negativo para la base de datos remota, ya que la realización de demasiadas consultas simultáneas puede sobrecargar el servicio. Esto es especialmente problemático para las bases de datos de aplicaciones. Tenga cuidado si establece este valor por encima de 50.
Nota:
Puede acelerar las consultas si selecciona una columna con un índice calculado en la base de datos de origen para partitionColumn
.
En el ejemplo de código siguiente se muestra cómo configurar el paralelismo para un clúster con ocho núcleos:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Nota:
Azure Databricks admite todas las opciones de Apache Spark para configurar JDBC.
Al escribir en bases de datos mediante JDBC, Apache Spark usa el número de particiones en memoria para controlar el paralelismo. Puede volver a particionar los datos antes de escribir para controlar el paralelismo. Evite un gran número de particiones en clústeres grandes para evitar sobrecargar la base de datos remota. En el ejemplo siguiente se muestra cómo volver a particionar en ocho particiones antes de escribir:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Inserción de una consulta en el motor de base de datos
Puede insertar una consulta completa en la base de datos y devolver solo el resultado. El parámetro table
identifica la tabla JDBC que se va a leer. Puede usar cualquier cosa que sea válida en una cláusula FROM
de consulta SQL.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Control del número de filas capturadas por consulta
Los controladores de JDBC tienen un parámetro fetchSize
que controla el número de filas que se capturan a la vez de la base de datos remota.
Configuración | Resultado |
---|---|
Demasiado bajo | Latencia alta debido a muchos recorridos de ida y vuelta (pocas filas devueltas por consulta) |
Demasiado alto | Error de memoria insuficiente (demasiados datos devueltos en una consulta) |
El valor óptimo depende de la carga de trabajo. Entre las consideraciones se incluyen las siguientes:
- ¿Cuántas columnas devuelve la consulta?
- ¿Qué tipos de datos se devuelven?
- ¿Qué longitud tienen las cadenas de cada columna devuelta?
Los sistemas podrían tener un valor predeterminado muy pequeño y beneficiarse de su ajuste. Por ejemplo, el valor predeterminado de fetchSize
de Oracle es 10. Si se aumenta a 100, se reduce el número de consultas totales que deben ejecutarse en un factor de 10. Los resultados de JDBC son tráfico de red, por lo que conviene evitar números muy grandes, aunque los valores óptimos podrían ascender a millares para numerosos conjuntos de datos.
Use la opción fetchSize
, como en el ejemplo siguiente:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()