Creare cluster, notebook e processi con Terraform
Questo articolo illustra come usare il provider Databricks Terraform per creare un cluster, un notebook e un processo in un'area di lavoro di Azure Databricks esistente.
Questo articolo è complementare ai seguenti articoli introduttivi di Azure Databricks:
Esercitazione: Eseguire una pipeline di analisi lakehouse end-to-end, che usa un cluster che funziona con il catalogo Unity, un notebook Python e un processo per eseguire il notebook.
Avvio rapido: Eseguire un processo Spark nell'area di lavoro di Azure Databricks tramite il portale di Azure, che usa un cluster per utilizzo generico e un notebook Python.
È possibile adattare le configurazioni di Terraform di questo articolo anche per creare cluster, notebook e processi personalizzati nelle aree di lavoro.
Passaggio 1: Creare e configurare il progetto Terraform
Creare un progetto Terraform seguendo le istruzioni nella sezione Requisiti dell'articolo Panoramica del provider Terraform di Databricks.
Per creare un cluster, creare un file denominato
cluster.tf
e aggiungervi il seguente contenuto. Questo contenuto crea un cluster con la quantità minima di risorse consentite. Questo cluster usa la versione LTS (Long Term Support) più recente di Databricks Runtime.Per un cluster che funziona con il catalogo Unity:
variable "cluster_name" {} variable "cluster_autotermination_minutes" {} variable "cluster_num_workers" {} variable "cluster_data_security_mode" {} # Create the cluster with the "smallest" amount # of resources allowed. data "databricks_node_type" "smallest" { local_disk = true } # Use the latest Databricks Runtime # Long Term Support (LTS) version. data "databricks_spark_version" "latest_lts" { long_term_support = true } resource "databricks_cluster" "this" { cluster_name = var.cluster_name node_type_id = data.databricks_node_type.smallest.id spark_version = data.databricks_spark_version.latest_lts.id autotermination_minutes = var.cluster_autotermination_minutes num_workers = var.cluster_num_workers data_security_mode = var.cluster_data_security_mode } output "cluster_url" { value = databricks_cluster.this.url }
Per un cluster multiuso:
variable "cluster_name" { description = "A name for the cluster." type = string default = "My Cluster" } variable "cluster_autotermination_minutes" { description = "How many minutes before automatically terminating due to inactivity." type = number default = 60 } variable "cluster_num_workers" { description = "The number of workers." type = number default = 1 } # Create the cluster with the "smallest" amount # of resources allowed. data "databricks_node_type" "smallest" { local_disk = true } # Use the latest Databricks Runtime # Long Term Support (LTS) version. data "databricks_spark_version" "latest_lts" { long_term_support = true } resource "databricks_cluster" "this" { cluster_name = var.cluster_name node_type_id = data.databricks_node_type.smallest.id spark_version = data.databricks_spark_version.latest_lts.id autotermination_minutes = var.cluster_autotermination_minutes num_workers = var.cluster_num_workers } output "cluster_url" { value = databricks_cluster.this.url }
Per creare un cluster, creare un altro file denominato
cluster.auto.tfvars
e aggiungervi il seguente contenuto. Questo file contiene valori di variabile per la personalizzazione del cluster. Sostituire i valori segnaposto con i propri valori.Per un cluster che funziona con il catalogo Unity:
cluster_name = "My Cluster" cluster_autotermination_minutes = 60 cluster_num_workers = 1 cluster_data_security_mode = "SINGLE_USER"
Per un cluster multiuso:
cluster_name = "My Cluster" cluster_autotermination_minutes = 60 cluster_num_workers = 1
Per creare un notebook, creare un altro file denominato
notebook.tf
e aggiungervi il seguente contenuto:variable "notebook_subdirectory" { description = "A name for the subdirectory to store the notebook." type = string default = "Terraform" } variable "notebook_filename" { description = "The notebook's filename." type = string } variable "notebook_language" { description = "The language of the notebook." type = string } resource "databricks_notebook" "this" { path = "${data.databricks_current_user.me.home}/${var.notebook_subdirectory}/${var.notebook_filename}" language = var.notebook_language source = "./${var.notebook_filename}" } output "notebook_url" { value = databricks_notebook.this.url }
Se si crea un cluster, salvare il codice del notebook seguente in un file nella stessa directory del file
notebook.tf
:Per il notebook Python per Esercitazione: Eseguire una pipeline di analisi lakehouse end-to-end, un file denominato
notebook-getting-started-lakehouse-e2e.py
con il contenuto seguente:# Databricks notebook source external_location = "<your_external_location>" catalog = "<your_catalog>" dbutils.fs.put(f"{external_location}/foobar.txt", "Hello world!", True) display(dbutils.fs.head(f"{external_location}/foobar.txt")) dbutils.fs.rm(f"{external_location}/foobar.txt") display(spark.sql(f"SHOW SCHEMAS IN {catalog}")) # COMMAND ---------- from pyspark.sql.functions import col # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"{catalog}.e2e_lakehouse_{username}_db" source = f"{external_location}/e2e-lakehouse-source" table = f"{database}.target_table" checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") # Clear out data from previous demo execution dbutils.fs.rm(source, True) dbutils.fs.rm(checkpoint_path, True) # Define a class to load batches of data to source class LoadData: def __init__(self, source): self.source = source def get_date(self): try: df = spark.read.format("json").load(source) except: return "2016-01-01" batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0] if batch_date.month == 3: raise Exception("Source data exhausted") return batch_date def get_batch(self, batch_date): return ( spark.table("samples.nyctaxi.trips") .filter(col("tpep_pickup_datetime").cast("date") == batch_date) ) def write_batch(self, batch): batch.write.format("json").mode("append").save(self.source) def land_batch(self): batch_date = self.get_date() batch = self.get_batch(batch_date) self.write_batch(batch) RawData = LoadData(source) # COMMAND ---------- RawData.land_batch() # COMMAND ---------- # Import functions from pyspark.sql.functions import col, current_timestamp # Configure Auto Loader to ingest JSON data to a Delta table (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", checkpoint_path) .load(file_path) .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time")) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .option("mergeSchema", "true") .toTable(table)) # COMMAND ---------- df = spark.read.table(table_name) # COMMAND ---------- display(df)
Per il notebook Python per Avvio rapido: Eseguire un processo Spark nell'area di lavoro di Azure Databricks usando il portale di Azure, un file denominato
notebook-quickstart-create-databricks-workspace-portal.py
con il contenuto seguente:# Databricks notebook source blob_account_name = "azureopendatastorage" blob_container_name = "citydatacontainer" blob_relative_path = "Safety/Release/city=Seattle" blob_sas_token = r"" # COMMAND ---------- wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name,blob_relative_path) spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token) print('Remote blob path: ' + wasbs_path) # COMMAND ---------- df = spark.read.parquet(wasbs_path) print('Register the DataFrame as a SQL temporary view: source') df.createOrReplaceTempView('source') # COMMAND ---------- print('Displaying top 10 rows: ') display(spark.sql('SELECT * FROM source LIMIT 10'))
Se si sta creando un notebook, creare un altro file denominato
notebook.auto.tfvars
e aggiungervi il seguente contenuto. Questo file contiene valori di variabile per la personalizzazione della configurazione del notebook.Per il notebook Python per Esercitazione: Eseguire una pipeline di analisi lakehouse end-to-end:
notebook_subdirectory = "Terraform" notebook_filename = "notebook-getting-started-lakehouse-e2e.py" notebook_language = "PYTHON"
Per il notebook Python per Avvio rapido: Eseguire un processo Spark nell'area di lavoro di Azure Databricks tramite il portale di Azure:
notebook_subdirectory = "Terraform" notebook_filename = "notebook-quickstart-create-databricks-workspace-portal.py" notebook_language = "PYTHON"
Se si crea un notebook, nell'area di lavoro di Azure Databricks assicurarsi di configurare eventuali requisiti per l'esecuzione corretta del notebook, facendo riferimento alle istruzioni seguenti per:
- Il notebook Python per Esercitazione: Eseguire una pipeline di analisi lakehouse end-to-end
- Il notebook Python per Avvio rapido: Eseguire un processo Spark nell'area di lavoro di Azure Databricks tramite il portale di Azure
Per creare il processo, creare un altro file denominato
job.tf
e aggiungervi il seguente contenuto. Questo contenuto crea un processo per eseguire il notebook.variable "job_name" { description = "A name for the job." type = string default = "My Job" } variable "task_key" { description = "A name for the task." type = string default = "my_task" } resource "databricks_job" "this" { name = var.job_name task { task_key = var.task_key existing_cluster_id = databricks_cluster.this.cluster_id notebook_task { notebook_path = databricks_notebook.this.path } } email_notifications { on_success = [ data.databricks_current_user.me.user_name ] on_failure = [ data.databricks_current_user.me.user_name ] } } output "job_url" { value = databricks_job.this.url }
Se si sta creando un processo, creare un altro file denominato
job.auto.tfvars
e aggiungervi il seguente contenuto. Questo file contiene un valore di variabile per personalizzare la configurazione del processo.job_name = "My Job" task_key = "my_task"
Passaggio 2: Eseguire le configurazioni
In questo passaggio si eseguono le configurazioni Terraform per distribuire il cluster, il notebook e il processo nell'area di lavoro di Azure Databricks.
Verificare se le configurazioni di Terraform sono valide eseguendo il comando
terraform validate
. Se vengono segnalati errori, correggerli ed eseguire di nuovo il comando.terraform validate
Verificare le operazioni che Terraform eseguirà nell'area di lavoro, prima che le esegua effettivamente, eseguendo il comando
terraform plan
.terraform plan
Distribuire il cluster, il notebook e il processo nell'area di lavoro eseguendo il comando
terraform apply
. Al prompt che chiede di distribuire, digitareyes
e premere Invio.terraform apply
Terraform distribuisce le risorse specificate nel progetto. La distribuzione di queste risorse (in particolare di un cluster) può richiedere alcuni minuti.
Passaggio 3: Esplorare i risultati
Se è stato creato un cluster, nell'output del comando
terraform apply
copiare il collegamento accanto acluster_url
e incollarlo nella barra degli indirizzi del Web browser.Se è stato creato un notebook, nell'output del comando
terraform apply
copiare il collegamento accanto anotebook_url
e incollarlo nella barra degli indirizzi del Web browser.Nota
Prima di usare il notebook, potrebbe essere necessario personalizzarne il contenuto. Vedere la relativa documentazione su come personalizzare il notebook.
Se è stato creato un processo, nell'output del comando
terraform apply
copiare il collegamento accanto ajob_url
e incollarlo nella barra degli indirizzi del Web browser.Nota
Prima di eseguire il notebook, potrebbe essere necessario personalizzarne il contenuto. Vedere i collegamenti all'inizio di questo articolo per consultare la documentazione correlata su come personalizzare il notebook.
Se è stato creato un processo, eseguire il processo come segue:
- Fare clic su Esegui ora nella pagina del processo.
- Al termine dell'esecuzione del processo, per visualizzare i risultati dell'esecuzione del processo, nell'elenco Esecuzioni completate (ultimi 60 giorni) della pagina del processo, fare clic sulla voce relativa all’ora più recente nella colonna Ora di inizio. Il riquadro Output mostra il risultato dell'esecuzione del codice del notebook.
Passaggio 4: Pulizia
In questo passaggio si eliminano le risorse precedenti dall'area di lavoro.
Verificare le operazioni che Terraform eseguirà nell'area di lavoro, prima che le esegua effettivamente, eseguendo il comando
terraform plan
.terraform plan
Eliminare il cluster, il notebook e il processo dall'area di lavoro eseguendo il comando
terraform destroy
. Al prompt che chiede di eliminare, digitareyes
e premere Invio.terraform destroy
Terraform elimina le risorse specificate nel progetto.