Шаблон для поэтапного накопления данных с помощью Dataflow Gen2

Это руководство занимает 15 минут и описывает, как постепенно накапливать данные в lakehouse с помощью Dataflow Gen2.

Для постепенного накопления данных в хранилище данных требуется метод загрузки только новых или обновленных данных. Этот метод можно выполнить с помощью запроса для фильтрации данных на основе назначения данных. В этом руководстве показано, как создать поток данных для загрузки данных из источника OData в озеро и добавление запроса в поток данных для фильтрации данных на основе назначения данных.

Ниже приведены шаги высокого уровня, описанные в этом руководстве.

  • Создайте поток данных для загрузки данных из источника OData в озеро.
  • Добавьте запрос в поток данных, чтобы отфильтровать данные на основе назначения данных.
  • (Необязательно) перезагрузка данных с использованием ноутбуков и пайплайнов.

Требования

У вас должна быть рабочая область с поддержкой Microsoft Fabric. Если у вас еще нет одного, обратитесь к статье "Создание рабочей области". Кроме того, в этом руководстве предполагается, что вы используете представление схемы в Dataflow 2-го поколения. Чтобы проверить, используете ли вы представление диаграммы, на верхней ленте перейдите к представлению и убедитесь, что выбрано представление схемы.

Создайте поток данных для загрузки данных из источника OData в lakehouse

В этом разделе описано, как создать поток данных для загрузки данных из источника OData в озеро.

  1. Создайте новый lakehouse в рабочей области.

    Снимок экрана: диалоговое окно создания lakehouse.

  2. Создайте поток данных 2-го поколения в рабочей области.

    снимок экрана с раскрывающимся меню для создания потока данных.

  3. Добавьте новый источник в поток данных. Выберите источник OData и введите следующий URL-адрес: https://services.OData.org/V4/Northwind/Northwind.svc

    Снимок экрана: диалоговое окно получения данных.

    Снимок экрана: соединитель OData.

    Снимок экрана: параметры OData.

  4. Выберите таблицу "Заказы" и нажмите кнопку "Далее".

    Снимок экрана: диалоговое окно выбора таблицы заказов.

  5. Выберите следующие столбцы, чтобы сохранить:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    Снимок экрана: функция выбора столбцов.

    Снимок экрана: таблица выбора столбцов заказов.

  6. Измените тип данных OrderDate, RequiredDate и ShippedDate на datetime.

    Снимок экрана: функция изменения типа данных.

  7. Настройте место назначения данных для вашего lakehouse, используя следующие параметры:

    • Назначение данных: Lakehouse
    • Lakehouse: Выберите озеро, созданное на шаге 1.
    • Новое имя таблицы: Orders
    • Метод Update: Replace

    Снимок экрана с панелью «Lakehouse назначения данных».

    Снимок экрана с таблицей заказов хранилища данных.

    Снимок экрана с изменением настроек назначения озера данных.

  8. Нажмите кнопку "Далее" и опубликуйте поток данных.

    Снимок экрана: диалоговое окно публикации потока данных.

Теперь вы создали поток данных для загрузки данных из источника OData в озеро. Этот поток данных используется в следующем разделе для добавления запроса в поток данных для фильтрации данных на основе назначения данных. После этого можно использовать поток данных для перезагрузки данных с помощью записных книжек и конвейеров.

Добавление запроса в поток данных для фильтрации данных на основе назначения данных

В этом разделе добавляется запрос к потоку данных для фильтрации данных на основе данных в целевом лейкхаусе. Запрос получает максимальный OrderID в lakehouse в начале обновления потока данных и использует максимальный идентификатор OrderId, чтобы выбрать заказы с более высоким OrderId из источника и добавить их в назначение данных. Предполагается, что заказы добавляются в исходные данные в порядке возрастания OrderID. Если это не так, можно использовать другой столбец для фильтрации данных. Например, можно использовать столбец OrderDate для фильтрации данных.

Примечание.

Фильтры OData применяются в Fabric после получения данных из источника данных, однако для таких источников баз данных, как SQL Server, фильтр применяется в запросе, отправленном в серверный источник данных, и только отфильтрованные строки возвращаются в службу.

  1. После обновления потока данных снова откройте поток данных, созданный в предыдущем разделе.

    Снимок экрана: диалоговое окно открытого потока данных.

  2. Создайте новый запрос с именем IncrementalOrderID и получите данные из таблицы Orders в lakehouse, созданной в предыдущем разделе.

    Снимок экрана: диалоговое окно получения данных.

    Снимок экрана: соединитель Lakehouse.

    Снимок экрана, показывающий таблицу заказов в lakehouse.

    Снимок экрана: функция переименования запроса.

    Снимок экрана: переименованный запрос.

  3. Отключите промежуточное выполнение этого запроса.

    Снимок экрана, показывающий отключение функции развертывания.

  4. В предварительном просмотре данных щелкните правой кнопкой мыши OrderID столбец и выберите " Детализация".

    снимок экрана, показывающий функцию детализации.

  5. На ленте выберите Средства списка -> ->.

    Снимок экрана: функция максимального порядка статистики.

Теперь у вас есть запрос, который возвращает максимальный идентификатор OrderID в lakehouse. Этот запрос используется для фильтрации данных из источника OData. В следующем разделе добавляется запрос к потоку данных для фильтрации данных из источника OData на основе максимального значения OrderID в lakehouse.

  1. Вернитесь к запросу "Заказы" и добавьте новый шаг для фильтрации данных. Используйте следующие параметры:

    • Столбец: OrderID
    • Операция: Greater than
    • Значение: параметр IncrementalOrderID

    Снимок экрана: функция заказа больше функции фильтра.

    Снимок экрана: параметры фильтра.

  2. Разрешить объединение данных из источника OData и lakehouse, подтвердив следующее диалоговое окно:

    Снимок экрана: диалоговое окно

  3. Обновите назначение данных, чтобы использовать следующие параметры:

    • Метод Update: Append

    Снимок экрана: функция редактирования выходных параметров.

    Снимок экрана: существующая таблица заказов.

    Снимок экрана с параметрами для добавления настроек назначения озера данных.

  4. Опубликуйте поток данных.

    Снимок экрана: диалоговое окно публикации потока данных.

Поток данных теперь содержит запрос, который фильтрует данные из источника OData на основе максимального значения OrderID в lakehouse. Это означает, что в lakehouse загружаются только новые или обновленные данные. Следующий раздел использует поток данных для перезагрузки данных с помощью ноутбуков и конвейеров.

(Необязательно) перезагрузить данные с помощью ноутбуков и конвейеров

При необходимости можно перезагрузить определенные данные с помощью записных книжек и конвейеров. С помощью пользовательского кода на Python в записной книжке вы удаляете старые данные из системы хранения данных lakehouse. Затем, создав конвейерную систему, в которой вы сначала запускаете записную книжку, а затем последовательно выполняете поток данных, вы перезагружаете данные из источника OData в lakehouse. Блокноты поддерживают несколько языков, однако в данном руководстве используется PySpark. Pyspark — это API Python для Spark и используется в этом руководстве для запуска запросов Spark SQL.

  1. Создайте записную книжку в рабочей области.

    снимок экрана: диалоговое окно

  2. Добавьте следующий код PySpark в записную книжку:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. Запустите записную книжку, чтобы убедиться, что данные удалены из lakehouse.

  4. Создайте конвейер в рабочей области.

    Снимок экрана: диалоговое окно нового конвейера.

  5. Добавьте новое действие блокнота в поток обработки и выберите блокнот, созданный на предыдущем шаге.

    Снимок экрана с диалоговым окном добавления записи в записную книжку.

    Снимок экрана: диалоговое окно выбора записной книжки.

  6. Добавьте новое действие потока данных в конвейер и выберите поток данных, созданный в предыдущем разделе.

    Снимок экрана: диалоговое окно добавления активности потока данных.

    Снимок экрана: диалоговое окно выбора потока данных.

  7. Соедините действие записной книжки с действием потока данных с триггером успешного завершения.

    Снимок экрана: диалоговое окно действий подключения.

  8. Сохраните и запустите конвейер.

    Снимок экрана: диалоговое окно запуска конвейера.

Теперь у вас есть конвейер, который удаляет старые данные из лейкхауса и перезагружает данные из источника OData обратно в лейкхаус. С помощью этой конфигурации можно регулярно загружать данные из источника OData в lakehouse.