Поделиться через


Задача потока данных

В состав задачи потока данных входит подсистема обработки потока данных, перемещающая данные между источником и назначением и позволяющая пользователю преобразовывать, очищать и изменять данные в процессе перемещения. Добавление задачи потока данных в поток управления пакета позволяет пакету извлекать, преобразовывать и загружать данные.

Поток данных состоит по крайней мере из одного компонента потока данных, но обычно является набором связанных компонентов потока данных: источники извлекают данные; преобразования изменяют, маршрутизируют или суммируют данные; назначения загружают данные. Компоненты связаны в потоке данных путями. Каждый путь связывает два компонента, являющиеся началом и концом пути. Дополнительные сведения см. в разделе Элементы потока данных.

Во время выполнения задача потока данных создает план выполнения в соответствии с потоком данных, а подсистема обработки потока данных выполняет этот план. Возможно создание задачи потока данных, не содержащей потока данных, но задача будет выполнена, только если она включает по меньшей мере один поток данных.

На следующей диаграмме показана задача потока данных с одним потоком данных.

Поток данных

Задача потока данных может включать несколько потоков данных. Если задача копирует несколько наборов данных и порядок, в котором производится копирование, неважен, может оказаться полезным включить в задачу потока данных несколько потоков данных. Например, можно включить пять потоков данных, каждый из которых копирует данные из неструктурированного файла в другую таблицу измерения хранилища данных со схемой «звезда».

Однако, если имеется несколько потоков данных в пределах одной задачи потока данных, порядок выполнения определяется подсистемой обработки потока данных. Таким образом, если порядок важен, пакет должен использовать несколько задач потока данных, каждая из которых содержит один поток данных. При этом можно будет задать ограничение очередностью для управления очередностью выполнения задач.

На следующей диаграмме показана задача потока данных с несколькими потоками данных.

Потоки данных

Пакет может включать несколько задач «Поток данных», что особенно характерно для сложных пакетов. Например, если пакет требует, чтобы потоки данных были запущены в указанной последовательности или чтобы между потоками данных были выполнены другие задачи, необходимо указывать отдельную задачу потока данных для каждого потока данных.

Задача потока данных управляет также и потоками ошибок. Во время выполнения могут возникать ошибки на уровне строк при преобразовании данных, выполнении уточняющего запроса или вычислении выражений компонентами потока данных. Например, столбец данных, содержащий строковое значение, не удалось преобразовать в целое число, либо в выражении была попытка деления на ноль. Обе операции вызывают ошибки, и строки, содержащие ошибки, можно обработать отдельно при помощи потока ошибок. Дополнительные сведения об использовании потоков ошибок в потоке данных пакета см. в разделе Обработка ошибок в данных в потоке данных.

Чтобы выполнить массовую вставку из текстовых файлов в базу данных SQL Server вместо задачи потока данных и самого потока данных можно использовать задачу «Массовая вставка». Однако задача «Массовая вставка» не может преобразовывать данные. Дополнительные сведения см. в разделе Задача «Массовая вставка».

Использование выражений свойств с элементами потока данных

Некоторые из компонентов потоков данных — источники, фильтры и цели — поддерживают использование выражений с некоторыми своими свойствами. Выражение свойств — это выражение, которое заменяет значение свойства при загрузке пакета. Во время выполнения пакет будет использовать обновленные значения свойств. Выражения строятся с использованием синтаксиса выражений служб Integration Services и могут включать функции служб Integration Services, операторы, идентификаторы и переменные. Дополнительные сведения см. в разделах Справочник по выражениям служб Integration Services, Использование выражений в пакетах и Использование выражений свойств в пакетах.

При построении пакета в среде Business Intelligence Development Studio свойства компонентов потоков данных, которые поддерживают выражения свойств, публикуются в задаче потока данных, которой они принадлежат. Чтобы добавить, изменить или удалить выражения свойств из компонентов потока данных, щелкните задачу потока данных и в окне «Свойства» или в редакторе задачи добавьте, измените или удалите выражения свойств. Выражения свойств для задачи потока данных можно добавлять, изменять или удалять в окне «Свойства».

Если в потоке данных содержатся компоненты, использующие выражения свойств, эти выражения также отображаются в окне «Свойства». Для просмотра выражений выберите задачу потока данных, которой принадлежат компоненты. Свойства можно просматривать по категориям или в алфавитном порядке. Если в окне «Свойства» отображается представление по категориям, все выражения, не использующиеся в конкретном свойстве, приводятся в списке категории Прочие. Если выражения представлены в алфавитном порядке, они отображаются по имени компонента потока данных.

Записи журнала

Службы Integration Services предоставляют журнальные события, которые доступны всем задачам. Для многих задач службы Integration Services предоставляют также пользовательские записи журнала. Дополнительные сведения см. в разделах Реализация ведения журналов в пакетах и Пользовательские сообщения для ведения журнала. Задача потока данных содержит следующие пользовательские записи журнала.

Запись журнала

Описание

BufferSizeTuning

Указывает, что задача потока данных изменила размер буфера. Эта запись журнала описывает причины изменения размера и фиксирует новый временный размер буфера.

OnPipelinePostEndOfRowset

Означает, что компонент получил сигнал конца набора строк, который устанавливается при последнем вызове метода ProcessInput. Запись делается для каждого компонента в потоке данных, который обрабатывает ввод. Запись включает имя компонента.

OnPipelinePostPrimeOutput

Указывает, что компонент завершил последний вызов метода PrimeOutput. В зависимости от потока данных, возможно формирование нескольких записей в журнале. Если компонент является источником, эта запись журнала означает, что компонент завершил обработку строк.

OnPipelinePreEndOfRowset

Показывает, что компонент уже готов получить сигнал конца набора строк, который устанавливается при последнем вызове метода ProcessInput. Запись делается для каждого компонента в потоке данных, который обрабатывает ввод. Запись включает имя компонента.

OnPipelinePrePrimeOutput

Показывает, что компонент готов получить свой вызов из метода PrimeOutput. В зависимости от потока данных, возможно формирование нескольких записей в журнале.

OnPipelineRowsSent

Сообщает количество строк, предоставленных входу компонента с помощью вызова метода ProcessInput. Запись журнала включает имя компонента.

PipelineBufferLeak

Предоставляет сведения обо всех компонентах, которые удерживают буферы от уничтожения после того, как диспетчер буферов завершил свое выполнение. Если буфер все еще существует, ресурсы буферов не освобождаются и могут вызвать утечку памяти. Запись журнала предоставляет имя компонента и идентификатор буфера.

PipelineComponentTime

Сообщает о времени (в миллисекундах), которое компонент тратит на каждый из пяти основных шагов обработки — Validate, PreExecute, PostExecute, ProcessInput и ProcessOutput.

PipelineExecutionPlan

Сообщает о плане выполнения потока данных. План выполнения предоставляет сведения о том, как буферы будут отсылаться компонентам. Эти сведения в сочетании с записью журнала PipelineExecutionTrees описывают работу задачи потока данных.

PipelineExecutionTrees

Сообщает о дереве выполнения макета в потоке данных. Диспетчер ядра подсистемы обработки потока данных использует деревья для построения плана выполнения потока данных.

PipelineInitialization

Предоставляет сведения об инициализации задачи. Эти сведения включают каталоги, используемые для временного хранения данных большого объема типа BLOB, размер буфера по умолчанию и количество строк в буфере. В зависимости от настройки задачи потока данных, возможно формирование нескольких записей в журнале.

Эти записи журнала предоставляют достаточные сведения об исполнении задачи потока данных при каждом запуске пакета. Если пакеты запускаются периодически, то можно получить сведения, которые со временем предоставят важные данные предыстории о выполнении задачи, о вопросах, влияющих на производительность, и об объемах обрабатываемых задачей данных.

Дополнительные сведения о об использовании таких записей журнала для наблюдения и повышения производительности потока данных см. в следующих разделах.

Образцы сообщений от задачи потока данных

В следующей таблице приведен список образцов сообщений для записей журнала в очень простом пакете. Пакет использует источник данных «OLE DB» для извлечения данных из таблицы, преобразование «Сортировка» для сортировки данных и назначение «OLE DB» для записи данных в другую таблицу.

Запись журнала

Сообщения

BufferSizeTuning

Строки в буфере 0-го типа могут привести к тому, что размер буфера станет больше заданного максимума. В буфере такого типа может быть только 9637 строк.

Строки в буфере 2-го типа могут привести к тому, что размер буфера станет больше заданного максимума. В буфере такого типа может быть только 9497 строк.

Строки в буфере 3-го типа могут привести к тому, что размер буфера станет больше заданного максимума. В буфере такого типа может быть только 9497 строк.

OnPipelinePostEndOfRowset

Компонент получит сигнал об окончании набора строк. : 1180 : Сортировка : 1181 : Вход сортировки

Компонент получит сигнал об окончании набора строк. : 1291 : Назначение «OLE DB» : 1304 : Вход назначения «OLE DB»

OnPipelinePostPrimeOutput

Компонент завершил вызов PrimeOutput. : 1180 : Сортировка

Компонент завершил вызов PrimeOutput. : 1 : Источник «OLE DB»

OnPipelinePreEndOfRowset

Компонент завершил обработку всех своих строк. : 1180 : Сортировка : 1181 : Вход сортировки

Компонент завершил обработку всех своих строк. : 1291 : Назначение «OLE DB» : 1304 : Вход назначения «OLE DB»

OnPipelinePrePrimeOutput

PrimeOutput будет вызван в компоненте. : 1180 : Сортировка

PrimeOutput будет вызван в компоненте. : 1 : Источник «OLE DB»

OnPipelineRowsSent

В качестве входных данных компоненту потока данных были предоставлены строки. : : 1185 : Выходные данные источника «OLE DB» : 1180 : Сортировка : 1181 : Вход сортировки : 76

В качестве входных данных компоненту потока данных были предоставлены строки. : : 1308 : Выход сортировки : 1291 : Назначение «OLE DB» : 1304 : Вход назначения «OLE DB» : 76

PipelineComponentTime

Компонент «Вычисление LineItemTotalCost» (3522) затратил 356 миллисекунд на шаге ProcessInput.

Компонент «Сложение Quantity и LineItemTotalCost» (3619) затратил 79 миллисекунд на шаге ProcessInput.

Компонент «Вычисление средней стоимости» (3662) затратил 16 миллисекунд на шаге ProcessInput.

Компонент «Сортировка по значению ProductID» (3717) затратил 125 миллисекунд на шаге ProcessInput.

Компонент «Загрузка данных» (3773) затратил 0 миллисекунд на шаге ProcessInput.

Компонент «Извлечение данных» (3869) затратил 688 миллисекунд на шаге PrimeOutput, заполняя буферы выхода «Выход источника OLE DB» (3879).

Компонент «Сложение Quantity и LineItemTotalCost» (3619) затратил 141 миллисекунду на шаге PrimeOutput, заполняя буферы выхода «Выход статистического выражения 1" (3621)».

Компонент «Сортировка по значению ProductID» (3717) затратил 16 миллисекунд на шаге PrimeOutput, заполняя буферы выхода компонента «Выход сортировки» (3719).

PipelineExecutionPlan

SourceThread0

Управление: 1

Влияния: 1180 1291

Выходной список операций

CreatePrimeBuffer первого типа для выходных данных с идентификатором ID 11.

SetBufferListener: «WorkThread0» для входных данных с идентификатором ID 1181

CreatePrimeBuffer третьего типа для выходных данных с идентификатором ID 12.

CallPrimeOutput для компонента «Источник OLE DB» (1)

Конец выходного списка операций

Завершение SourceThread0

WorkThread0

Управление: 1180

Влияния: 1180 1291

Список действий над входными данными, входные данные с идентификатором ID 1181 (Ожидается 1 EOR (конец набора строк))

CallProcessInput для входных данных с идентификатором ID 1181 для компонента «Сортировка» (1180) для представления типа 2

Завершающий список действий над входными данными для входных данных 1181

Выходной список операций

CreatePrimeBuffer четвертого типа для выходных данных с идентификатором ID 1182.

SetBufferListener: «WorkThread1» для входных данных с идентификатором ID 1304

CallPrimeOutput для компонента «Сортировка» (1180)

Конец выходного списка операций

Завершение WorkThread0

WorkThread1

Управление: 1291

Влияния: 1291

Список действий над входными данными, входные данные с идентификатором ID 1304 (Ожидается 1 EOR (конец набора строк))

CallProcessInput для входных данный с идентификатором ID 1304 для компонента «Назначение OLE DB» (1291) для представления типа 5

Завершающий список действий над входными данными для входных данных 1304

Выходной список операций

Конец выходного списка операций

Завершение WorkThread1

PipelineExecutionTrees

Начало выполнения дерева 0

Выходные данные «Выход источника OLE DB» (11)

Входные данные «Вход сортировки» (1181)

Завершение выполнения дерева 0

Начало выполнения дерева 1

Выходные данные «Ошибочные выходные данные источника OLE DB» (12)

Завершение выполнения дерева 1

Начало выполнения дерева 2

Выходные данные «Выход сортировки» (1182)

Входные данные «Вход назначения OLE DB» (1304)

Выходные данные «Ошибочные выходные данные назначения OLE DB» (1305)

Завершение выполнения дерева 2

PipelineInitialization

Не было предоставлено место хранения временных данных типа BLOB. Диспетчер буферов будет предполагать, что эти каталоги размещения временных данных указаны в переменных среды TEMP или TMP.

Размер буфера по умолчанию равен 10 485 760 байт.

По умолчанию буферы предназначены для хранения 10000 строк.

Поток данных не удалит неиспользованные компоненты, потому что свойство RunInOptimizedMode имеет значение False.

Многие регистрируемые события оставляют по нескольку записей, а сообщения для некоторых записей журнала содержат сложные данные. Чтобы упростить понимание и взаимодействие содержимого сложных сообщений, можно провести синтаксический анализ текста сообщения. В зависимости от положения журналов можно использовать инструкции Transact-SQL или компонент сценария для разделения сложного текста по столбцам или преобразования в другие форматы, которые окажутся более удобными.

Например, следующая таблица содержит сообщение «В качестве входных данных компоненту потока данных были предоставлены строки. : : 1185 : Выходные данные источника «OLE DB» : 1180 : Сортировка : 1181 : Вход сортировки : 76», разделенное на столбцы. Сообщение записано событием OnPipelineRowsSent, когда строки были отправлены из источника «OLE DB» в преобразование «Сортировка».

Столбец

Описание

Значение

PathID

Значение свойства ID пути между источником «OLE DB» и преобразованием «Сортировка».

1185

PathName

Значение свойства Name пути.

Выход источника «OLE DB»

ComponentID

Значение свойства ID преобразования «Сортировка».

1180

ComponentName

Значение свойства Name преобразования «Сортировка».

Сортировка

InputID

Значение свойства ID входных данных для преобразования «Сортировка».

1181

InputName

Значение свойства Name входных данных для преобразования «Сортировка».

Вход сортировки

RowsSent

Количество строк, посланных в качестве входных данных на преобразование «Сортировка».

76

Источники

Следующие источники имеют свойства, которые могут обновляться через выражения свойств.

Дополнительные сведения см. в разделе Пользовательские свойства источника.

Преобразования

Назначения

Следующие назначения обладают свойствами, которые могут обновляться через выражения свойств.

Дополнительные сведения см. в разделе Пользовательские свойства назначений.

Настройка задачи потока данных

Задать свойства можно в окне Свойства или программно.

Дополнительные сведения о настройке этих свойств в окне Свойства см. в следующем разделе:

Программная настройка задачи потока данных

Дополнительные сведения о программном добавлении задачи потока данных к пакету и установке свойств потока данных, см. в следующем разделе:

Значок служб Integration Services (маленький) Будьте в курсе новых возможностей cлужб Integration Services

Чтобы загружать новейшую документацию, статьи, образцы и видеоматериалы от корпорации Майкрософт, а также лучшие решения от участников сообщества, посетите страницу Integration Services на сайтах MSDN или TechNet:

Чтобы получать автоматические уведомления об этих обновлениях, подпишитесь на RSS-каналы, предлагаемые на этой странице.