Трансформация соединения в потоке данных для сопоставления

ПРИМЕНИМО К: Фабрика данных Azure Azure Synapse Analytics

Совет

Data Factory в Microsoft Fabric — это следующее поколение Фабрика данных Azure с более простой архитектурой, встроенным ИИ и новыми функциями. Если вы не знакомы с интеграцией данных, начните с Fabric Data Factory. Существующие рабочие нагрузки ADF могут обновляться до Fabric для доступа к новым возможностям в области обработки и анализа данных, аналитики в режиме реального времени и отчетов.

Потоки данных доступны как в конвейерах Фабрика данных Azure, так и в Azure Synapse Analytics конвейерах. Эта статья относится к сопоставлению потоков данных. Если вы не знакомы с преобразованиями, ознакомьтесь с вводной статьей "Преобразование данных с помощью сопоставления потоков данных".

Совет

Эквивалентное преобразование (запросы слияния) в Dataflow 2-го поколения см. в руководстве по потоку данных 2-го поколения для сопоставления пользователей потока данных.

Используйте преобразование "Соединение" для объединения данных из двух источников или потоков в поток данных для сопоставления. Выходной поток включает все столбцы из обоих источников, соответствующие условию соединения.

Типы соединения

Потоки данных для сопоставления в настоящее время поддерживают пять различных типов соединений.

Внутреннее соединение

Внутреннее соединение выводит только строки, которые имеют совпадающие значения в обеих таблицах.

Левое внешнее соединение

Левое внешнее соединение возвращает все строки из левого потока и сопоставленные записи из правого потока. Если строка из левого потока не имеет соответствия, выходным столбцам из правого потока задается значение NULL. Выходные данные — это строки, возвращаемые внутренним соединением, а также несовпаденные строки из левого потока.

Примечание.

Движок Spark, используемый потоками данных, иногда дает сбой из-за возможных декартовых продуктов в условиях соединения. Если это происходит, можно переключиться на пользовательское кросс-соединение и вручную ввести условие соединения. Это может привести к замедлению производительности потоков данных, так как подсистеме выполнения может потребоваться вычислить все строки с обеих сторон связи, а затем фильтровать строки.

Правое внешнее соединение

Правое внешнее соединение возвращает все строки из правого потока и сопоставленные записи из левого потока. Если строка из правого потока не имеет соответствия, выходным столбцам из левого потока задается значение NULL. Выходные данные — это строки, возвращаемые внутренним соединением, а также несовпаденные строки из правого потока.

Полное внешнее соединение

Полное внешнее соединение выводит все столбцы и строки с обеих сторон со значениями NULL для столбцов, которые не совпадают.

Пользовательское перекрестное соединение

Перекрестное соединение выводит перекрестное произведение двух потоков на основе условия. Если вы используете условие, которое не является равенством, необходимо указать пользовательское выражение в качестве условия перекрестного соединения. Выходной поток — это все строки, соответствующие условию соединения.

Вы можете использовать этот тип соединения для неэквивалентных соединений и условий OR.

Если вы хотите явно создать полный декартов продукт, в каждом из двух независимых потоков перед соединением используйте преобразование "Производный столбец", чтобы создать искусственный ключ для сопоставления. Например, создайте новый столбец в компоненте Производный столбец в каждом потоке, назовите его SyntheticKey и присвойте ему значение 1. Затем используйте a.SyntheticKey == b.SyntheticKey в качестве пользовательского выражения соединения.

Примечание.

Обязательно включите хотя бы один столбец с каждой стороны левой и правой связей в пользовательском перекрестном соединении. Выполнение перекрестных соединений со статическими значениями вместо столбцов с каждой стороны приводит к полному сканированию всего набора данных, в результате чего поток данных работает плохо.

Нечеткое соединение

Вы можете выбрать соединение на основе логики нечеткого соединения, а не точного соответствия значений столбцов, установив флажок "Использовать нечеткое соответствие".

  • Объединение частей текста: используйте этот параметр для поиска совпадений, удаляя пробел между словами. Например, если эта опция включена, Data Factory считается совпадающим с DataFactory.
  • Столбец показателя подобия: при необходимости можно сохранить показатель соответствия для каждой строки в столбце; для этого нужно ввести здесь имя нового столбца, в котором будет сохранено это значение.
  • Порог сходства: выберите значение от 60 до 100 в качестве процента соответствия значений в выбранных столбцах.

Нечеткое соединение

Примечание.

Нечеткое соответствие в настоящее время работает только с типами строковых столбцов и с типами внутренних, левых внешних и полных внешних соединений. При использовании нечетких сопоставленных соединений необходимо отключить оптимизацию трансляции.

Настройка

  1. В поле Правый поток выберите, к какому потоку данных вы присоединяетесь.
  2. Выберите тип соединения.
  3. Выберите, какие ключевые столбцы вы хотите использовать для сопоставления в вашем условии соединения. По умолчанию поток данных ищет равенство между одним столбцом в каждом потоке. Чтобы выполнить сравнение с помощью вычисленного значения, наведите указатель мыши на раскрывающийся список столбцов и выберите Вычисляемый столбец.

Снимок экрана преобразования join

Неэквивалентные соединения

Чтобы использовать условный оператор, например "не равно" (!=) или "больше" (>) в условиях соединения, измените раскрывающийся список операторов между двумя столбцами. Для неэквивалентных соединений требуется широковещательная рассылка по крайней мере одного из двух потоков с помощью Фиксированного вещания на вкладке Оптимизация.

Неэквивалентное соединение

Оптимизация производительности соединения

В отличие от операции объединения в таких инструментах, как SQL Server Integration Services, преобразование объединения не обязательно является слиянием. Для ключей соединения не требуется сортировка. Операция соединения происходит на основе оптимального типа соединения в Spark и может быть либо широковещательным, либо на стороне map.

Оптимизация преобразования соединения

При объединении, поиске и преобразовании "существование", если один или оба потока данных помещаются в память рабочего узла, можно оптимизировать производительность, включив Распространение. По умолчанию движок Spark автоматически решает, следует ли транслировать одну из сторон. Чтобы вручную выбрать канал для трансляции, выберите Фиксированный.

Не рекомендуется отключать широковещательную трансляцию с помощью параметра Выкл., пока соединения не столкнутся с ошибками времени ожидания.

Самосоединение

Чтобы самостоятельно соединить поток данных с самим собой, переименуйте имеющийся поток с использованием операции 'SELECT'. Создайте ветвь, щелкнув значок плюса рядом с преобразованием и выбрав Новая ветвь. Добавьте преобразование Select для алиасирования исходного потока. Добавьте преобразование "Соединение" и выберите исходный поток в качестве левого потока, а преобразование "Выбор" — в качестве правого потока.

Самосоединение

Тестирование условий соединения

При тестировании преобразований соединения с предварительным просмотром данных в режиме отладки используйте небольшой набор известных данных. При выборке строк из большого набора данных невозможно предсказать, какие строки и ключи считываются для тестирования. Результат имеет недетерминированный характер, то есть условия соединения могут не возвращать совпадений.

Скрипт потока данных

Синтаксис

<leftStream>, <rightStream>
    join(
        <conditionalExpression>,
        joinType: { 'inner'> | 'outer' | 'left_outer' | 'right_outer' | 'cross' }
        broadcast: { 'auto' | 'left' | 'right' | 'both' | 'off' }
    ) ~> <joinTransformationName>

Пример внутреннего соединения

В этом примере используется преобразование соединения с именем JoinMatchedData , которое принимает левый поток TripData и правый поток TripFare. Условие соединения является выражением hack_license == { hack_license} && TripData@medallion == TripFare@medallion && vendor_id == { vendor_id} && pickup_datetime == { pickup_datetime}, которое возвращает значение TRUE, если столбцы hack_license, medallion, vendor_id и pickup_datetime в каждом потоке совпадают. Значение параметра joinType — 'inner'. Мы включаем трансляцию только в левом потоке, поэтому broadcast имеет значение 'left'.

В пользовательском интерфейсе это преобразование выглядит следующим образом:

На снимке экрана показано преобразование с выбранной вкладкой

Скрипт потока данных для этого преобразования находится в этом фрагменте кода:

TripData, TripFare
    join(
        hack_license == { hack_license}
        && TripData@medallion == TripFare@medallion
        && vendor_id == { vendor_id}
        && pickup_datetime == { pickup_datetime},
        joinType:'inner',
        broadcast: 'left'
    )~> JoinMatchedData

Пример пользовательского перекрестного соединения

В этом примере используется преобразование соединения с именем JoiningColumns , которое принимает левый поток LeftStream и правый поток RightStream. Это преобразование принимает два потока и объединяет все строки, где столбец leftstreamcolumn больше столбца rightstreamcolumn. Значение параметра joinType — cross. Трансляция не включена broadcast имеет значение 'none'.

В пользовательском интерфейсе это преобразование выглядит следующим образом:

На снимке экрана показано преобразование с выбранной вкладкой

Скрипт потока данных для этого преобразования находится в фрагменте кода:

LeftStream, RightStream
    join(
        leftstreamcolumn > rightstreamcolumn,
        joinType:'cross',
        broadcast: 'none'
    )~> JoiningColumns

После объединения данных создайте производный столбец и передайте данные в целевое хранилище данных.