Краткое руководство по преобразованию данных с помощью потоков данных для сопоставления

В этом кратком руководстве служба Azure Synapse Analytics будет использоваться для создания конвейера, который преобразует данные из источника Azure Data Lake Storage 2-го поколения (ADLS 2-го поколения) в приемник ADLS 2-го поколения с помощью потока данных для сопоставления. Шаблон конфигурации в этом кратком руководстве можно взять за основу для настройки преобразования данных с помощью потока данных для сопоставления.

При работе с этим кратким руководством вы выполните следующие действия:

  • создание конвейера с использованием действия "Поток данных" в Azure Synapse Analytics;
  • создание потока данных для сопоставления с четырьмя преобразованиями;
  • тестовый запуск конвейера;
  • Отслеживание действия "Поток данных"

Предварительные требования

  • Подписка Azure: Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.

  • Рабочая область Azure Synapse: создайте рабочую область Synapse с помощью портала Azure, следуя инструкциям, приведенным в статье Краткое руководство по созданию рабочей области Synapse.

  • Учетная запись хранения Azure: для хранения данных источника и приемника используется хранилище ADLS. Если у вас нет учетной записи хранения, создайте ее, следуя действиям в этом разделе.

    Файл, который мы будем преобразовывать в этом руководстве, MoviesDB.csv, можно найти здесь. Чтобы извлечь файл из GitHub, скопируйте его содержимое в свой текстовый редактор, чтобы сохранить его на локальном компьютере в виде CSV-файла. Сведения о передаче файла в учетную запись хранения см. в статье об отправке BLOB-объектов с помощью портала Azure. В примерах будет использоваться контейнер под названием Sample-Data.

После создания рабочей области Azure Synapse можно открыть Synapse Studio двумя способами:

  • Откройте рабочую область Synapse на портале Azure. Выберите Открыть в Synapse Studio карта Открыть в разделе Начало работы.
  • Откройте Azure Synapse Analytics и войдите в рабочую область.

Для целей этого краткого руководства в качестве примера мы используем рабочую область с именем adftest2020. При этом вы автоматически перейдете на домашнюю страницу Synapse Studio.

Домашняя страница Synapse Studio

Создание конвейера с помощью действия "Поток данных"

Конвейер содержит логический поток для выполнения набора действий. В этом разделе будет создан конвейер, содержащий действие Потока данных.

  1. Перейдите на вкладку Integrate (Интеграция). Нажмите значок плюса рядом с заголовком конвейеров и выберите "Конвейер".

    Создание конвейера

  2. На странице параметров Свойства конвейера введите в поле Имя значение TransformMovies.

  3. В разделе Перемещение и преобразование на панели Действия перетащите элемент Поток данных на холст конвейера.

  4. Во всплывающем окне Добавление потока данных выберите Создать новый поток данных ->Поток данных. Когда закончите, нажмите кнопку ОК.

    Создание потока данных

  5. Присвойте потоку данных имя TransformMovies на странице Свойства.

Встраивание логики преобразования в холст потока данных

После создания потока данных он будет автоматически отправлен на холст Потока данных. На этом шаге будет создан поток данных, который извлекает файл MoviesDB.csv из хранилища ADLS и вычисляет среднюю оценку комедий с 1910 по 2000 гг. Затем этот файл будет записан обратно в хранилище ADLS.

  1. Установите ползунок Отладка потока данных над холстом потока данных в состояние "Вкл.". Режим отладки позволяет в интерактивном режиме тестировать логику преобразования в динамическом кластере Spark. Подготовка кластеров Потоков данных занимает 5–7 минут, поэтому пользователям рекомендуем сначала включить отладку, если планируется разработка Потока данных. Дополнительные сведения см. в статье Режим отладки.

    Включение отладки с помощью ползунка

  2. На холсте потока данных добавьте источник, щелкнув поле Добавить источник.

  3. Присвойте источнику имя MoviesDB. Нажмите Создать, чтобы создать новый набор данных источника.

    Создание нового набора данных источника

  4. Выберите Azure Data Lake Storage 2-го поколения. Нажмите кнопку "Продолжить".

    Выбор Azure Data Lake Storage 2-го поколения

  5. Выберите DelimitedText. Нажмите кнопку "Продолжить".

  6. Присвойте набору данных имя MoviesDB. В раскрывающемся списке "Связанная служба" выберите Создать.

  7. На экране создания связанной службы присвойте имя связанной службе ADLS 2-го поколения ADLSGen2 и укажите метод проверки подлинности. Затем введите учетные данные подключения. В этом кратком руководстве для подключения к нашей учетной записи хранения используется ключ учетной записи. Можно нажать кнопку Проверить подключение, чтобы проверить правильность ввода учетных данных. После завершения нажмите Создать.

    Создание исходной связанной службы

  8. Вернитесь на экран создания набора данных и в поле Путь к файлу укажите расположение файла. В этом кратком руководстве файл MoviesDB.csv находится в контейнере Sample-Data. Так как файл содержит заголовки, установите флажок Первая строка в качестве заголовка. Выберите Из подключения/хранилища, чтобы импортировать схему заголовка непосредственно из файла в хранилище. Когда закончите, нажмите кнопку ОК.

    Параметры исходного набора данных

  9. Если кластер отладки запущен, перейдите на вкладку Предварительный просмотр данных преобразования источника и нажмите кнопку Обновить, чтобы получить моментальный снимок данных. Предварительный просмотр данных дает возможность убедиться, что преобразование настроено правильно.

    Предварительный просмотр данных

  10. Щелкните значок "плюс" рядом с узлом источника на холсте потока данных, чтобы добавить новое преобразование. Первое добавляемое преобразование — Фильтр.

    Добавление фильтра

  11. Назовите преобразование фильтра FilterYears. Щелкните поле "Выражение" рядом с полем Фильтр, чтобы открыть построитель выражений. Здесь нужно указать условие фильтрации.

  12. Построитель выражений потока данных позволяет интерактивно создавать выражения для использования в различных преобразованиях. Выражения могут включать встроенные функции, столбцы из входной схемы и задаваемые пользователем параметры. Дополнительные сведения о построении выражений см. в статье Построитель выражений Потока данных.

    В этом кратком руководстве будут отфильтрованы фильмы в жанре комедии, которые вышли в период между 1910 и 2000 годами. В связи с тем, что в настоящее время год является строкой, ее необходимо преобразовать в целое число с помощью функции toInteger(). Используйте операторы "больше или равно" (>=) и "меньше или равно" (<=) для сравнения значений года с литералами 1910 и 200-. Объедините эти выражения с помощью оператора && (и). Выражение будет выглядеть следующим образом:

    toInteger(year) >= 1910 && toInteger(year) <= 2000

    Чтобы узнать, какие фильмы являются комедиями, можно использовать функцию rlike(), позволяющую найти слово "комедия" в жанрах столбца. Объедините выражение rlike с выражением сравнения года и получите следующее выражение:

    toInteger(year) >= 1910 && toInteger(year) <= 2000 && rlike(genres, 'Comedy')

    Укажите условие фильтрации

    Если кластер отладки активен, можно проверить логику, нажав кнопку Обновить, чтобы увидеть результат выражения в сравнении с используемыми входными данными. Реализовать эту логику с помощью языка выражений потока данных можно разными способами.

    После завершения работы с выражением нажмите кнопку Сохранить и завершить.

  13. Загрузите результаты предварительного просмотра данных, чтобы убедиться, что фильтр работает правильно.

  14. Следующее преобразование, которое необходимо добавить, — статистическая обработка в модификаторе схемы.

    Добавление статистической обработки

  15. Назовите агрегатное преобразование AggregateComedyRatings. На вкладке Группировать по выберите год из раскрывающегося списка, чтобы сгруппировать агрегаты по году выхода фильма.

    Параметры агрегата 1

  16. Перейдите на вкладку Статистическая обработка. В левом текстовом поле присвойте столбцу имя AverageComedyRating. Щелкните правой кнопкой мыши поле выражения, чтобы ввести выражение агрегирования с помощью построителя выражений.

    Параметры агрегата 2

  17. Чтобы получить среднее значение столбца Оценка, используйте агрегатную функцию avg(). Так как оценка является строковым значением, а avg() принимает числовые входные данные, необходимо преобразовать значение в число с помощью функции toInteger(). Это выражение выглядит следующим образом:

    avg(toInteger(Rating))

    После завершения нажмите Сохранить и завершить.

    Средняя оценка комедии

  18. Перейдите на вкладку Предварительный просмотр данных, чтобы просмотреть выходные данные преобразования. Обратите внимание, что здесь есть только два столбца: year и AverageComedyRating.

    Предварительный просмотр статистической обработки

  19. Затем необходимо добавить преобразование Приемник в разделе Назначение.

    Добавление приемника

  20. Назовите приемник Sink. Нажмите Создать, чтобы создать набор данных приемника.

  21. Выберите Azure Data Lake Storage 2-го поколения. Нажмите кнопку "Продолжить".

  22. Выберите DelimitedText. Нажмите кнопку "Продолжить".

  23. Назовите приемный набор данных MoviesSink. В качестве связанной службы выберите связанную службу ADLS 2-го поколения, созданную на шаге 7. Укажите выходную папку для записи данных. В этом кратком руководстве мы записываем данные в папку Output в контейнере Sample-Data. Папка не обязательно должна существовать заранее и может быть создана динамически. Задайте для параметра Использовать первую строку в качестве заголовка значение "истина" и выберите значение Нет для параметра Импорт схемы. Когда закончите, нажмите кнопку ОК.

    Свойства набора данных приемника

Теперь создание потока данных завершено. Все готово для его запуска в конвейере.

Запуск и отслеживание Потока данных

Перед публикацией можно выполнить отладку конвейера. На этом шаге будет активирован запуск отладки конвейера Потока данных. При предварительном просмотре данных они не записываются, а в режиме отладки данные записываются в место назначения приемника.

  1. Перейдите на холст конвейера. Нажмите кнопку Отладка, чтобы запустить отладку.

    Отладка конвейера

  2. При отладке конвейера для действий потока данных используется активный кластер отладки, но инициализация по-прежнему займет не менее минуты. Ход выполнения можно отслеживать на вкладке Выходные данные. После успешного выполнения щелкните значок с очками, чтобы открыть область мониторинга.

    Выходные данные отладки

  3. В области мониторинга можно увидеть количество строк и время, затраченное на каждый шаг преобразования.

    Мониторинг преобразования

  4. Щелкните преобразование, чтобы получить подробные сведения о столбцах и секционировании данных.

    Сведения о преобразовании

Если действия в этом кратком руководстве выполнены правильно, в папку приемника должны быть записаны 83 строки и 2 столбца. Данные можно проверить в хранилище BLOB-объектов.

Дальнейшие действия

Ознакомьтесь со следующими статьями, чтобы узнать о поддержке Azure Synapse Analytics: