Compartir vía


Envío de trabajos de Spark mediante herramientas de línea de comandos

Se aplica a: SQL Server 2019 (15.x)

En este artículo se proporcionan instrucciones sobre cómo usar las herramientas de línea de comandos para ejecutar trabajos de Spark en Clústeres de macrodatos de SQL Server.

Importante

El complemento Clústeres de macrodatos de Microsoft SQL Server 2019 se va a retirar. La compatibilidad con Clústeres de macrodatos de SQL Server 2019 finalizará el 28 de febrero de 2025. Todos los usuarios existentes de SQL Server 2019 con Software Assurance serán totalmente compatibles con la plataforma, y el software se seguirá conservando a través de actualizaciones acumulativas de SQL Server hasta ese momento. Para más información, consulte la entrada de blog sobre el anuncio y Opciones de macrodatos en la plataforma Microsoft SQL Server.

Requisitos previos

  • Herramientas de macrodatos de SQL Server 2019 configuradas e iniciadas en el clúster:
    • azdata
    • Aplicación curl para realizar llamadas API REST a Livy

Trabajos de Spark que usan azdata o Livy

En este artículo se proporcionan ejemplos de cómo usar patrones de línea de comandos para enviar aplicaciones Spark a Clústeres de macrodatos de SQL Server.

Los comandos azdata bdc spark de la CLI de datos de Azure revelan todas las funcionalidades de Spark para Clústeres de macrodatos de SQL Server en la línea de comandos. Este artículo se centra en el envío de trabajos. Sin embargo, azdata bdc spark también admite modos interactivos para Python, Scala, SQL y R por medio del comando azdata bdc spark session.

Si necesita la integración directa con una API REST, use las llamadas Livy estándar para enviar los trabajos. En este artículo se usa la herramienta de línea de comandos curl en los ejemplos de Livy para ejecutar la llamada API REST. Para obtener un ejemplo detallado que muestra cómo interactuar con el punto de conexión de Livy de Spark mediante código de Python, consulte cómo usar Spark desde el punto de conexión de Livy en GitHub.

ETL simple con Spark para Clústeres de macrodatos

Esta aplicación de extracción, transformación y carga (ETL) sigue un patrón de ingeniería de datos común. Carga los datos tabulares desde una ruta de acceso de zona de aterrizaje del Sistema de archivos distribuido de Apache Hadoop (HDFS). A continuación, los escribe con formato de tabla en una ruta de acceso de zona procesada por HDFS.

Descargue el conjunto de datos de la aplicación de ejemplo. A continuación, cree aplicaciones de PySpark mediante PySpark, Spark Scala o Spark SQL.

En las secciones siguientes, encontrará ejercicios de ejemplo para cada solución. Seleccione la pestaña correspondiente a su plataforma. La aplicación se ejecutará mediante azdata o curl.

En este ejemplo se usa la aplicación de PySpark siguiente. Se guarda como archivo de Python denominado parquet_etl_sample.py en la máquina local.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t', 
    header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
    "feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
    "catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
    "catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
    "catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")

# Print the data frame inferred schema
df.printSchema()

tot_rows = df.count()
print("Number of rows:", tot_rows)

# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")

# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")

print("Sample ETL pipeline completed")

Copia de la aplicación de PySpark en HDFS

Almacene la aplicación en HDFS a fin de que el clúster tenga acceso a ella para ejecutarla. Como procedimiento recomendado, normalice y controle las ubicaciones de las aplicaciones dentro del clúster para simplificar la administración.

En este caso de uso de ejemplo, todas las aplicaciones de canalización de ETL se almacenan en la ruta de acceso hdfs:/apps/ETL-Pipelines. La aplicación de ejemplo se almacena en hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py.

Ejecute el comando siguiente para cargar parquet_etl_sample.py desde la máquina de almacenamiento provisional o desarrollo local en el clúster de HDFS.

azdata bdc hdfs cp --from-path parquet_etl_sample.py  --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"

Ejecución de la aplicación de Spark

Use el comando siguiente para enviar la aplicación a Spark para Clústeres de macrodatos de SQL Server a fin de ejecutarla.

El comando azdata ejecuta la aplicación mediante parámetros especificados normalmente. Para obtener las opciones de parámetro completas de azdata bdc spark batch create, vea azdata bdc spark.

La aplicación requiere el parámetro de configuración spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation. Por lo tanto, el comando usa la opción --config. Este valor muestra cómo pasar configuraciones a la sesión de Spark.

Puede usar la opción --config para especificar varios parámetros de configuración. También puede especificarlos en la sesión de la aplicación si establece la configuración en el objeto SparkSession.

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

Advertencia

El parámetro "name" o "n" del nombre del lote debe ser único cada vez que se crea un nuevo lote.

Supervisión de trabajos de Spark

Los comandos azdata bdc spark batch proporcionan acciones de administración para los trabajos por lotes de Spark.

Para enumerar todos los trabajos en ejecución, ejecute el comando siguiente.

  • El comando azdata:

    azdata bdc spark batch list -o table
    
  • El comando curl con Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
    

Para obtener información de un lote de Spark con el identificador especificado, ejecute el comando siguiente. Se devuelve batch id de spark batch create.

  • El comando azdata:

    azdata bdc spark batch info --batch-id 0
    
  • El comando curl con Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
    

Para obtener información de estado de un lote de Spark con el identificador especificado, ejecute el comando siguiente.

  • El comando azdata:

    azdata bdc spark batch state --batch-id 0
    
  • El comando curl con Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
    

Para obtener los registros de un lote de Spark con el identificador especificado, ejecute el comando siguiente.

  • El comando azdata:

    azdata bdc spark batch log --batch-id 0
    
  • El comando curl con Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
    

Pasos siguientes

Para obtener información sobre la solución de problemas del código de Spark, vea Solución de problemas del cuaderno pyspark.

Hay disponible código de ejemplo completo de Spark en los ejemplos de Spark para Clústeres de macrodatos de SQL Server en GitHub.

Para obtener más información sobre los clústeres de macrodatos de SQL Server y los escenarios relacionados, vea Clústeres de macrodatos de SQL Server.