Share via


使用命令列工具提交 Spark 作業

適用於:SQL Server 2019 (15.x)

本文提供如何使用命令列工具在 SQL Server 巨量資料叢集上執行 Spark 作業的指引。

重要

Microsoft SQL Server 2019 巨量資料叢集附加元件將會淘汰。 SQL Server 2019 巨量資料叢集的支援將於 2025 年 2 月 28 日結束。 平台上將完全支援具有軟體保證之 SQL Server 2019 的所有現有使用者,而且該軟體將會持續透過 SQL Server 累積更新來維護,直到該時間為止。 如需詳細資訊,請參閱公告部落格文章Microsoft SQL Server 平台上的巨量資料選項

必要條件

使用 azdata 或 Livy 的 Spark 作業

本文提供如何使用命令列模式將 Spark 應用程式提交至 SQL Server 巨量資料叢集的範例。

Azure Data CLI azdata bdc spark 命令 會在命令列上呈現 SQL Server 巨量資料叢集 Spark 的所有功能。 本文著重於作業提交。 但是 azdata bdc spark 也透過 azdata bdc spark session 命令支援 Python、Scala、SQL 和 R 的互動模式。

如果您需要與 REST API 的直接整合,請使用標準 Livy 呼叫來提交作業。 本文使用 Livy 範例中的 curl 命令列工具來執行 REST API 呼叫。 如需示範如何使用 Python 程式碼與 Spark Livy 端點互動的詳細範例,請參閱 GitHub 上的從 Livy 端點使用 Spark

使用巨量資料叢集 Spark 的簡單 ETL

此擷取、轉換和載入 (ETL) 應用程式遵循常見的資料工程模式。 會從 Apache Hadoop 分散式檔案系統 (HDFS) 登陸區域路徑載入表格式資料。 然後會使用資料表格式寫入 HDFS 處理的區域路徑。

下載範例應用程式資料集。 然後使用 PySpark、Spark Scala 或 Spark SQL 建立 PySpark 應用程式。

在下列各節中,您會找到每個解決方案的範例練習。 選取適用於您的平台的索引標籤。 您會使用 azdatacurl 來執行應用程式。

此範例使用下列 PySpark 應用程式。 會在本機電腦上儲存為名為 parquet_etl_sample.py 的 Python 檔案。

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t', 
    header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
    "feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
    "catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
    "catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
    "catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")

# Print the data frame inferred schema
df.printSchema()

tot_rows = df.count()
print("Number of rows:", tot_rows)

# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")

# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")

print("Sample ETL pipeline completed")

將 PySpark 應用程式複製到 HDFS

將應用程式儲存在 HDFS 中,讓叢集可以存取以便執行。 最佳做法是標準化和管理叢集中的應用程式位置,以簡化系統管理。

在此範例使用案例中,所有 ETL 管線應用程式都會儲存在 hdfs:/apps/ETL-Pipelines 路徑上。 範例應用程式儲存在 hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py

執行下列命令,將 parquet_etl_sample.py 從本機開發或預備環境電腦上傳至 HDFS 叢集。

azdata bdc hdfs cp --from-path parquet_etl_sample.py  --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"

執行 Spark 應用程式

使用下列命令將應用程式提交至 SQL Server 巨量資料叢集 Spark 以供執行。

azdata 命令會使用常見指定參數來執行應用程式。 如需 azdata bdc spark batch create 的完整參數選項,請參閱 azdata bdc spark

此應用程式需要 spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation 組態參數。 因此,命令會使用 --config 選項。 此設定示範如何將組態傳遞至 Spark 工作階段。

您可以使用 --config 選項來指定多個組態參數。 您也可以藉由在 SparkSession 物件中設定組態,在應用程式工作階段內加以指定。

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

警告

每次建立新批次時,批次名稱的 "name" 或 "n" 參數應該是唯一的。

監視 Spark 作業

azdata bdc spark batch 命令 會提供 Spark 批次作業的管理動作。

若要列出所有執行中的作業,請執行下列命令。

  • azdata 命令:

    azdata bdc spark batch list -o table
    
  • 使用 Livy 的 curl 命令:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
    

若要針對具有指定識別碼的 Spark 批次取得資訊,請執行下列命令。 batch id 是從 spark batch create 傳回。

  • azdata 命令:

    azdata bdc spark batch info --batch-id 0
    
  • 使用 Livy 的 curl 命令:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
    

若要針對具有指定識別碼的 Spark 批次取得狀態資訊,請執行下列命令。

  • azdata 命令:

    azdata bdc spark batch state --batch-id 0
    
  • 使用 Livy 的 curl 命令:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
    

若要針對具有指定識別碼的 Spark 批次取得記錄,請執行下列命令。

  • azdata 命令:

    azdata bdc spark batch log --batch-id 0
    
  • 使用 Livy 的 curl 命令:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
    

下一步

如需針對 Spark 程式碼進行疑難排解的詳細資訊,請參閱針對 PySpark 筆記本進行疑難排解

如需完整的 Spark 範例程式碼,請參閱 GitHub 上的 SQL Server 巨量資料叢集 Spark 範例

如需 SQL Server 巨量資料叢集和相關案例的詳細資訊,請參閱 SQL Server 巨量資料叢集