次の方法で共有


マッピング データ フローの外部呼び出し変換

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。

データ フローは、Azure Data Factory および Azure Synapse Pipelines の両方で使用できます。 この記事は、マッピング データ フローに適用されます。 変換を初めて使用する場合は、概要の記事「マッピング データ フローを使用してデータを変換する」を参照してください。

外部呼び出し変換を使用すると、データ エンジニアは、カスタムまたはサード パーティの結果をデータ フロー ストリームに追加するために、外部の REST エンドポイントを行単位で呼び出すことができます。

構成

外部呼び出し変換の構成パネルで、最初に、接続先となる外部エンドポイントの種類を選択します。 次に、受信列をマップします。 最後に、ダウンストリームの変換で使用する出力データ構造を定義します。

外部呼び出し

設定

インライン データセットの種類と、関連するリンクされたサービスを選択します。 現在、REST のみがサポートされています。 ただし、SQL ストアド プロシージャやその他のリンクされたサービスの種類も使用できるようになる予定です。 設定のプロパティの説明については、REST ソースの構成に関するページを参照してください。

マッピング

自動マッピングを選択して、すべての入力列をエンドポイントに渡すことができます。 ここでは、必要に応じて、手動で列を設定し、ターゲット エンドポイントに送信される列の名前を変更することもできます。

出力

ここでは、外部呼び出しの出力のデータ構造を定義します。 本体の構造を定義でき、さらに外部呼び出しから返されるヘッダーと状態の保存方法を選択できます。

本体、ヘッダー、状態を保存することを選択する場合、それらをダウンストリーム データ変換で使用できるようにするために、最初にそれぞれの列名を選択します。

ADF データ フロー構文を使用して、本体データ構造を手動で定義できます。 本体の列名とデータ型を定義するには、[Import projection] (プロジェクションのインポート) をクリックし、ADF で外部呼び出しからスキーマ出力を検出できるようにします。 気象 REST API GET 呼び出しの出力としてのスキーマ定義構造の例を次に示します。

({@context} as string[],
		geometry as (coordinates as string[][][],
		type as string),
		properties as (elevation as (unitCode as string,
		value as string),
		forecastGenerator as string,
		generatedAt as string,
		periods as (detailedForecast as string, endTime as string, icon as string, isDaytime as string, name as string, number as string, shortForecast as string, startTime as string, temperature as string, temperatureTrend as string, temperatureUnit as string, windDirection as string, windSpeed as string)[],
		units as string,
		updateTime as string,
		updated as string,
		validTimes as string),
		type as string)

データ フロー スクリプトを含むサンプル

外部呼び出しのサンプル

source(output(
		id as string
	),
	allowSchemaDrift: true,
	validateSchema: false,
	ignoreNoFilesFound: false) ~> source1
Filter1 call(mapColumn(
		id
	),
	skipDuplicateMapInputs: false,
	skipDuplicateMapOutputs: false,
	output(
		headers as [string,string],
		body as (name as string)
	),
	allowSchemaDrift: true,
	store: 'restservice',
	format: 'rest',
	timeout: 30,
	httpMethod: 'POST',
	entity: 'api/Todo/',
	requestFormat: ['type' -> 'json'],
	responseFormat: ['type' -> 'json', 'documentForm' -> 'documentPerLine']) ~> ExternalCall1
source1 filter(toInteger(id)==1) ~> Filter1
ExternalCall1 sink(allowSchemaDrift: true,
	validateSchema: false,
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	store: 'cache',
	format: 'inline',
	output: false,
	saveOrder: 1) ~> sink1

データ フローのスクリプト

ExternalCall1 sink(allowSchemaDrift: true,
	validateSchema: false,
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	store: 'cache',
	format: 'inline',
	output: false,
	saveOrder: 1) ~> sink1