使用命令行工具提交 Spark 作业

适用于: SQL Server 2019 (15.x)

本文提供指南来说明如何使用命令行工具在 SQL Server 大数据群集上运行 Spark 作业。

重要

Microsoft SQL Server 2019 大数据群集附加产品将停用。 对 SQL Server 2019 大数据群集的支持将于 2025 年 2 月 28 日结束。 具有软件保障的 SQL Server 2019 的所有现有用户都将在平台上获得完全支持,在此之前,该软件将继续通过 SQL Server 累积更新进行维护。 有关详细信息,请参阅公告博客文章Microsoft SQL Server 平台上的大数据选项

先决条件

使用 azdata 或 Livy 的 Spark 作业

本文提供示例来讲解如何使用命令行模式将 Spark 应用程序提交到 SQL Server 大数据群集。

Azure 数据 CLI azdata bdc spark 命令在命令行上显示 SQL Server 大数据群集 Spark 的所有功能。 本文重点介绍了如何提交作业。 但 azdata bdc spark 也支持通过 azdata bdc spark session 命令对 Python、Scala、SQL 和 R 使用交互模式。

如果你需要与 REST API 直接集成,请使用标准 Livy 调用来提交作业。 本文将在 Livy 示例中使用 curl 命令行工具来运行 REST API 调用。 有关展示如何使用 Python 代码与 Spark Livy 终结点交互的详细示例,请参阅 GitHub 上的从 Livy 终结点使用 Spark

使用大数据群集 Spark 的简单 ETL

这个提取、转换和加载 (ETL) 应用程序遵循常见的数据工程模式。 它从 Apache Hadoop 分布式文件系统 (HDFS) 登陆区域路径加载表格数据。 然后,它使用表格格式写入 HDFS 处理的区域路径。

下载示例应用程序的数据集。 然后,使用 PySpark、Spark Scala 或 Spark SQL 创建 PySpark 应用程序。

以下部分分别提供了每个解决方案的示例练习。 选择适合你的平台的选项卡。 你将使用 azdatacurl 运行应用程序。

此示例使用以下 PySpark 应用程序。 它被保存为本地计算机上一个名为 parquet_etl_sample.py 的 Python 文件。

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t', 
    header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
    "feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
    "catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
    "catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
    "catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")

# Print the data frame inferred schema
df.printSchema()

tot_rows = df.count()
print("Number of rows:", tot_rows)

# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")

# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")

print("Sample ETL pipeline completed")

将 PySpark 应用程序复制到 HDFS

将应用程序存储在 HDFS 中,以便群集可以访问它来执行。 最佳做法是在群集中将应用程序位置进行标准化并进行治理,从而简化管理。

在此示例用例中,所有 ETL 管道应用程序都存储在 hdfs:/apps/ETL-Pipelines 路径上。 示例应用程序存储在 hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py 中。

运行以下命令,将 parquet_etl_sample.py 从本地开发计算机或过渡计算机上传到 HDFS 群集。

azdata bdc hdfs cp --from-path parquet_etl_sample.py  --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"

运行 Spark 应用程序

使用以下命令将应用程序提交到 SQL Server 大数据群集 Spark 以便执行。

azdata 命令通过使用通常指定的参数运行应用程序。 有关 azdata bdc spark batch create 的完整参数选项,请查看 azdata bdc spark

此应用程序需要 spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation 配置参数。 因此命令使用 --config 选项。 此设置展示了如何将配置传递到 Spark 会话。

可使用 --config 选项来指定多个配置参数。 还可以在应用程序会话内指定这些参数,方式是在 SparkSession 对象中设置配置。

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

警告

每次创建一个新批时,批名称的“name”或“n”参数应是唯一的。

监视 Spark 作业

azdata bdc spark batch 命令提供 Spark 批处理作业的管理操作。

若要列出所有正在运行的作业,请运行以下命令。

  • azdata 命令:

    azdata bdc spark batch list -o table
    
  • curl 命令(使用 Livy):

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
    

若要获取具有给定 ID 的 Spark 批处理的相关信息,请运行以下命令。 从 spark batch create 返回 batch id

  • azdata 命令:

    azdata bdc spark batch info --batch-id 0
    
  • curl 命令(使用 Livy):

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
    

若要获取具有给定 ID 的 Spark 批处理的状态信息,请运行以下命令。

  • azdata 命令:

    azdata bdc spark batch state --batch-id 0
    
  • curl 命令(使用 Livy):

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
    

若要获取具有给定 ID 的 Spark 批处理的日志,请运行以下命令。

  • azdata 命令:

    azdata bdc spark batch log --batch-id 0
    
  • curl 命令(使用 Livy):

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
    

后续步骤

若要了解如何排查 Spark 代码问题,请查看 PySpark 笔记本故障排除

GitHub 上 SQL Server 大数据群集 Spark 示例中提供了全面的 Spark 示例代码。

若要详细了解 SQL Server 大数据群集和相关方案,请参阅 SQL Server 大数据群集