Velocizzare l'analisi di Big Data in tempo reale con il connettore Spark
Si applica a: Database SQL di Azure Istanza gestita di SQL di Azure
Nota
A partire da settembre 2020, questo connettore non è più mantenuto attivamente. Tuttavia, Apache Spark Connessione or per SQL Server e Azure SQL è ora disponibile, con il supporto per le associazioni Python e R, un'interfaccia più semplice da usare per l'inserimento bulk dei dati e molti altri miglioramenti. È consigliabile valutare e usare il nuovo connettore anziché questo. Le informazioni sul connettore precedente (questa pagina) vengono conservate solo a scopo di archiviazione.
Il connettore Spark consente ai database SQL di Azure, Istanza gestita di SQL di Azure e SQL Server di fungere da origine dati di input o sink di dati di output per i processi Spark. È possibile utilizzare dati transazionali in tempo reale nel processo di big data analytics e mantenere i risultati per query ad hoc o per la generazione di report. Rispetto al connettore JDBC incorporato, questo connettore offre la possibilità di inserire dati in grandi quantità nel proprio database. È possibile ottenere eccezionali prestazioni di inserimento di una riga alla volta con una velocità maggiore di 10 o 20 volte. Il connettore Spark supporta l'autenticazione con Microsoft Entra ID (in precedenza Azure Active Directory) per connettersi a database SQL di Azure e Istanza gestita di SQL di Azure, consentendo di connettere il database da Azure Databricks usando l'account Microsoft Entra. Fornisce interfacce simili al connettore JDBC incorporato. È facile eseguire la migrazione dei processi Spark esistenti per l'utilizzo di questo nuovo connettore.
Nota
Microsoft Entra ID era precedentemente conosciuto come Azure Active Directory (Azure AD).
Scaricare e compilare un connettore Spark
Il repository GitHub per il connettore precedentemente collegato a da questa pagina non viene gestito attivamente. È invece consigliabile valutare e usare il nuovo connettore.
Versioni supportate ufficialmente
Componente | Versione |
---|---|
Apache Spark | 2.0.2 o versione successiva |
Scala | 2.10 o versione successiva |
Driver Microsoft JDBC per SQL Server | 6.2 o versione successiva |
Microsoft SQL Server | SQL Server 2008 o versione successiva |
Database SQL di Azure | Supportata |
Istanza gestita di SQL di Azure | Supportata |
Il connettore Spark utilizza Microsoft JDBC Driver per SQL Server per trasferire dati tra i nodi di lavoro Spark e i database SQL:
Il flusso di dati è il seguente:
- Il nodo principale Spark si connette al database SQL o a SQL Server e carica i dati da una tabella specifica o utilizzando una query SQL specifica.
- Il nodo principale Spark distribuisce i dati ai nodi di lavoro per la trasformazione.
- Il nodo di lavoro si connette a SQL Server o al database SQL e scrive i dati sul database. L'utente può scegliere se utilizzare l'inserimento riga per riga o l'inserimento in blocco.
Il diagramma seguente illustra il flusso di dati.
Creare il connettore Spark
Attualmente il progetto del connettore usa Maven. Per creare il connettore senza le dipendenze, è possibile eseguire:
- mvn clean package
- Scaricare le versioni più recenti del file JAR dalla cartella release
- Includere il JAR Spark del database SQL
Effettuare la connessione e leggere i dati usando il connettore Spark
È possibile connettersi al database SQL e SQL Server dai processi di Spark per leggere o scrivere dati. È anche possibile eseguire una query DML o DDL in un database SQL e SQL Server.
Leggere dati da Azure SQL o SQL Server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"dbTable" -> "dbo.Clients",
"user" -> "username",
"password" -> "*********",
"connectTimeout" -> "5", //seconds
"queryTimeout" -> "5" //seconds
))
val collection = sqlContext.read.sqlDB(config)
collection.show()
Leggere i dati da Azure SQL e o SQL Server con query SQL specificata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"queryCustom" -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
"user" -> "username",
"password" -> "*********",
))
//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()
Scrivere i dati in Azure SQL e SQL Server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
// Aquire a DataFrame collection (val collection)
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"dbTable" -> "dbo.Clients",
"user" -> "username",
"password" -> "*********"
))
import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)
Eseguire una query DML o DDL in Azure SQL o SQL Server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
|UPDATE Customers
|SET ContactName = 'Alfred Schmidt', City = 'Frankfurt'
|WHERE CustomerID = 1;
""".stripMargin
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"queryCustom" -> query
))
sqlContext.sqlDBQuery(config)
Connettersi da Spark utilizzando l'autenticazione di Microsoft Entra
È possibile connettersi a database SQL e Istanza gestita di SQL usando l'autenticazione di Microsoft Entra. Usare l'autenticazione di Microsoft Entra per gestire centralmente le identità degli utenti del database e come alternativa all'autenticazione di SQL.
Connessione tramite la modalità di autenticazione ActiveDirectoryPassword
Requisiti di installazione
Se si utilizza la modalità di autenticazione ActiveDirectoryPassword, è necessario scaricare microsoft-authentication-library-for-java e le relative dipendenze e includerli nel percorso di compilazione Java.
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"authentication" -> "ActiveDirectoryPassword",
"encrypt" -> "true"
))
val collection = sqlContext.read.sqlDB(config)
collection.show()
Connessione tramite token di accesso
Requisiti di installazione
Se si utilizza la modalità di autenticazione basata su token di accesso, è necessario scaricare microsoft-authentication-library-for-java e le relative dipendenze e includerli nel percorso di compilazione Java.
Vedere Usare l'autenticazione di Microsoft Entra per informazioni su come ottenere un token di accesso al database in database SQL di Azure o Istanza gestita di SQL di Azure.
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"accessToken" -> "access_token",
"hostNameInCertificate" -> "*.database.windows.net",
"encrypt" -> "true"
))
val collection = sqlContext.read.sqlDB(config)
collection.show()
Scrivere dati usando BULK INSERT
Il connettore jdbc tradizionali scrive dati nel database tramite l'inserimento riga per riga. È possibile utilizzare il connettore da Spark per scrivere dati in Azure SQL e SQL Server tramite l’inserimento in blocco. Migliora significativamente le prestazioni di scrittura durante il caricamento di grandi set di dati o il caricamento dei dati in tabelle in cui viene utilizzato un indice ColumnStore, quindi orientato alle colonne.
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
/**
Add column Metadata.
If not specified, metadata is automatically added
from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)
val bulkCopyConfig = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"dbTable" -> "dbo.Clients",
"bulkCopyBatchSize" -> "2500",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.
Passaggi successivi
Se ancora non lo si è fatto, scaricare il connettore Spark dal repository GitHub azure-sqldb-spark ed esplorare le risorse aggiuntive nel repository:
È possibile anche esaminare la guida ad Apache Spark SQL, DataFrame e set di dati e la documentazione Azure Databricks.