你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

映射数据流中的外部调用转换

适用于: Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用

数据流在 Azure 数据工厂和 Azure Synapse 管道中均可用。 本文适用于映射数据流。 如果不熟悉转换,请参阅介绍性文章使用映射数据流转换数据

利用外部调用转换,数据工程师能够逐行调用外部 REST 终结点,以便将自定义或第三方结果添加到数据流。

配置

在外部调用转换配置窗格,你将首先选择要连接的外部终结点类型。 下一步是映射传入列。 最后,定义下游转换使用的输出数据结构。

External call

设置

选择内联数据集类型和关联的链接服务。 目前,仅支持 REST。 但是,SQL 存储过程和其他链接服务类型也将可用。 有关设置属性的说明,请参阅 REST 源配置

映射

可以选择自动映射以将所有输入列传递到终结点。 或者,可以手动设置列,并重命名在此处发送到目标终结点的列。

输出

这就是你将为外部调用的输出定义数据结构的位置。 可以定义正文的结构,并选择如何存储从外部调用返回的标头和状态。

如果选择存储正文、标头和状态,请首先为每个名称选择一个列名称,以便下游数据转换可以使用它们。

可以使用 ADF 数据流语法手动定义正文数据结构。 若要定义正文的列名和数据类型,请单击“导入投影”并允许 ADF 检测来自外部调用的架构输出。 下面是一个示例架构定义结构,作为天气 REST API GET 的输出:

({@context} as string[],
		geometry as (coordinates as string[][][],
		type as string),
		properties as (elevation as (unitCode as string,
		value as string),
		forecastGenerator as string,
		generatedAt as string,
		periods as (detailedForecast as string, endTime as string, icon as string, isDaytime as string, name as string, number as string, shortForecast as string, startTime as string, temperature as string, temperatureTrend as string, temperatureUnit as string, windDirection as string, windSpeed as string)[],
		units as string,
		updateTime as string,
		updated as string,
		validTimes as string),
		type as string)

示例

示例包括数据流脚本

External call sample

source(output(
		id as string
	),
	allowSchemaDrift: true,
	validateSchema: false,
	ignoreNoFilesFound: false) ~> source1
Filter1 call(mapColumn(
		id
	),
	skipDuplicateMapInputs: false,
	skipDuplicateMapOutputs: false,
	output(
		headers as [string,string],
		body as (name as string)
	),
	allowSchemaDrift: true,
	store: 'restservice',
	format: 'rest',
	timeout: 30,
	httpMethod: 'POST',
	entity: 'api/Todo/',
	requestFormat: ['type' -> 'json'],
	responseFormat: ['type' -> 'json', 'documentForm' -> 'documentPerLine']) ~> ExternalCall1
source1 filter(toInteger(id)==1) ~> Filter1
ExternalCall1 sink(allowSchemaDrift: true,
	validateSchema: false,
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	store: 'cache',
	format: 'inline',
	output: false,
	saveOrder: 1) ~> sink1

数据流脚本

ExternalCall1 sink(allowSchemaDrift: true,
	validateSchema: false,
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	store: 'cache',
	format: 'inline',
	output: false,
	saveOrder: 1) ~> sink1