Поделиться через


Отправка заданий Spark с помощью программ командной строки

Область применения: SQL Server 2019 (15.x)

В этой статье приводятся рекомендации по использованию программ командной строки для выполнения заданий Spark в Кластерах больших данных SQL Server.

Внимание

Поддержка надстройки "Кластеры больших данных" Microsoft SQL Server 2019 будет прекращена. Мы прекратим поддержку Кластеров больших данных SQL Server 2019 28 февраля 2025 г. Все существующие пользователи SQL Server 2019 с Software Assurance будут полностью поддерживаться на платформе, и программное обеспечение будет продолжать поддерживаться с помощью накопительных обновлений SQL Server до этого времени. Дополнительные сведения см. в записи блога объявлений и в статье о параметрах больших данных на платформе Microsoft SQL Server.

Необходимые компоненты

  • Средства больших данных SQL Server 2019, настроенные и вошедший в кластер:
    • azdata
    • приложение curl для выполнения вызовов REST API к Livy.

Задания Spark, которые используют azdata или Livy

В этой статье приведены примеры использования шаблонов командной строки для отправки приложений Spark в Кластеры больших данных SQL Server.

Команды azdata bdc spark Azure Data CLI реализуют все возможности Spark в Кластерах больших данных SQL Server в командной строке. Эта статья посвящена отправке заданий. Но azdata bdc spark также поддерживает интерактивные режимы для Python, Scala, SQL и R с использованием команды azdata bdc spark session.

Если требуется непосредственная интеграция с REST API, используйте для отправки заданий стандартные вызовы Livy. В этой статье показано, как использовать программу командной строки curl в примерах Livy для выполнения вызова REST API. Подробный пример взаимодействия с конечной точкой Livy Spark с помощью кода Python см. на странице, посвященной использованию Spark из конечной точки Livy, на сайте GitHub.

Простая операция ETL, которая использует Кластеры больших данных Spark

Это приложение для извлечения, преобразования и загрузки (ETL) соответствует общему шаблону инжиниринга данных. Оно загружает табличные данные из пути к целевой зоне распределенной файловой системы Apache Hadoop (HDFS). Затем оно использует табличный формат для записи в путь к зоне с обработкой HDFS.

Скачайте набор данных примера приложения. Затем создайте приложения PySpark с помощью PySpark, Spark Scala или Spark SQL.

В следующих разделах вы найдете примеры упражнений для каждого решения. Выберите вкладку со своей платформой. Вы запустите приложение с помощью azdata или curl.

В этом примере используется следующее приложение PySpark. Он сохраняется как файл Python с именем parquet_etl_sample.py на локальном компьютере.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t', 
    header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
    "feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
    "catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
    "catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
    "catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")

# Print the data frame inferred schema
df.printSchema()

tot_rows = df.count()
print("Number of rows:", tot_rows)

# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")

# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")

print("Sample ETL pipeline completed")

Копирование приложения PySpark в HDFS

Сохраните приложение в HDFS, чтобы кластер мог получить доступ к нему для выполнения. Чтобы упростить администрирование, рекомендуется стандартизировать и администрировать расположения приложений в кластере.

В этом примере все приложения конвейера ETL будут храниться в папке hdfs:/apps/ETL-Pipelines. Пример приложения хранится в файле hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py.

Выполните следующую команду, чтобы отправить файл parquet_etl_sample.py с локального компьютера разработки или компьютера промежуточного хранения в кластер HDFS.

azdata bdc hdfs cp --from-path parquet_etl_sample.py  --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"

Выполнение приложения Spark

С помощью приведенной ниже команды отправьте приложение Spark в Кластеры больших данных SQL Server для выполнения.

Команда azdata запускает приложение с использованием часто определяемых параметров. Полный список параметров для azdata bdc spark batch create см. в статье azdata bdc spark.

Для этого приложения требуется параметр конфигурации spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation. Поэтому в команде используется параметр --config. Этот пример демонстрирует, как передавать конфигурации в сеанс Spark.

Вы можете использовать параметр --config, чтобы указать несколько параметров конфигурации. Это можно также сделать в сеансе приложения, задав конфигурацию в объекте SparkSession.

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

Предупреждение

Параметр name или n определяет имя пакета, которое должно быть уникальным при каждом создании пакета.

Мониторинг заданий Spark

Команды azdata bdc spark batch отвечают за выполнение действий по управлению пакетными заданиями Spark.

Чтобы вывести список всех текущих заданий, выполните следующую команду.

  • Команда azdata делает следующее:

    azdata bdc spark batch list -o table
    
  • Команда curl делает следующее (с использованием Livy):

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
    

Чтобы получить сведения для пакета Spark с заданным идентификатором, выполните следующую команду. spark batch create возвращает batch id.

  • Команда azdata делает следующее:

    azdata bdc spark batch info --batch-id 0
    
  • Команда curl делает следующее (с использованием Livy):

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
    

Чтобы получить сведения о состоянии для пакета Spark с заданным идентификатором, выполните следующую команду.

  • Команда azdata делает следующее:

    azdata bdc spark batch state --batch-id 0
    
  • Команда curl делает следующее (с использованием Livy):

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
    

Чтобы получить журналы для пакета Spark с заданным идентификатором, выполните следующую команду.

  • Команда azdata делает следующее:

    azdata bdc spark batch log --batch-id 0
    
  • Команда curl делает следующее (с использованием Livy):

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
    

Следующие шаги

Дополнительные сведения об устранении неполадок с кодом Spark см. в статье Устранение неполадок с записной книжкой pyspark.

Полный набор примеров кода Spark см. на странице с примерами Spark для Кластеров больших данных SQL Server на сайте GitHub.

Дополнительные сведения о Кластеры больших данных SQL Server и связанных сценариях см. в Кластеры больших данных SQL Server.