Azure Data Factory 및 Azure Synapse Analytics의 데이터 흐름 작업

적용 대상: Azure Data Factory Azure Synapse Analytics

기업용 올인원 분석 솔루션인 Microsoft Fabric의 Data Factory를 사용해 보세요. Microsoft Fabric은 데이터 이동부터 데이터 과학, 실시간 분석, 비즈니스 인텔리전스 및 보고에 이르기까지 모든 것을 다룹니다. 무료로 새 평가판을 시작하는 방법을 알아봅니다!

데이터 흐름 작업을 사용하여 매핑 데이터 흐름을 통해 데이터를 변환하고 이동합니다. 데이터 흐름을 처음 접하는 경우 매핑 데이터 흐름 개요를 참조하세요.

UI를 사용하여 Data Flow 작업 만들기

파이프라인에서 Data Flow 작업을 사용하려면 다음 단계를 완료합니다.

  1. 파이프라인 작업 창에서 Data Flow를 검색하고 Data Flow 작업을 파이프라인 캔버스로 끌어옵니다.

  2. 아직 선택하지 않은 경우 캔버스에서 새 Data Flow 작업 및 해당 설정 탭을 선택하여 세부 정보를 편집합니다.

    Data Flow 작업에 대한 UI를 표시합니다.

  3. 검사점 키는 데이터 흐름이 변경된 데이터 캡처에 사용될 때 검사점을 설정하는 데 사용되며 덮어쓸 수 있습니다. 데이터 흐름 활동은 “pipeline name + activity name” 대신 guid 값을 검사점 키로 사용하므로 이름 바꾸기 작업이 있더라도 항상 고객의 변경 데이터 캡처 상태를 추적할 수 있습니다. 모든 기존 데이터 흐름 작업은 이전 버전과의 호환성을 위해 이전 패턴 키를 사용합니다. 변경 데이터 캡처를 사용하도록 설정한 데이터 흐름 리소스를 사용하여 새 데이터 흐름 작업을 게시한 후 검사점 키 옵션은 아래와 같습니다.

    검사점 키가 있는 Data Flow 작업의 UI를 표시합니다.

  4. 기존 데이터 흐름을 선택하거나 새로 만들기 단추를 사용하여 새로 만듭니다. 구성을 완료하는 데 필요한 다른 옵션을 선택합니다.

구문

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

형식 속성

속성 설명 허용된 값 필수
dataflow 실행되는 데이터 흐름에 대한 참조입니다. DataFlowReference
integrationRuntime 데이터 흐름이 실행되는 컴퓨팅 환경입니다. 지정하지 않으면 자동 해결 Azure Integration Runtime이 사용됩니다. IntegrationRuntimeReference 아니요
compute.coreCount Spark 클러스터에서 사용되는 코어 수입니다. 자동 확인 Azure 통합 런타임이 사용되는 경우에만 지정할 수 있습니다. 8, 16, 32, 48, 80, 144, 272 아니요
compute.computeType Spark 클러스터에서 사용되는 컴퓨팅 유형입니다. 자동 확인 Azure 통합 런타임이 사용되는 경우에만 지정할 수 있습니다. "일반" 아니요
staging.linkedService Azure Synapse Analytics 원본 또는 싱크를 사용하는 경우 PolyBase 스테이징에 사용되는 스토리지 계정을 지정합니다.

Azure Storage가 VNet 서비스 엔드포인트로 구성된 경우 스토리지 계정에 "신뢰할 수 있는 Microsoft 서비스 허용"이 사용하도록 설정된 관리 ID 인증을 사용해야 합니다. Azure Storage에서 VNet 서비스 엔드포인트 사용의 영향을 참조하세요. 또한 Azure BlobAzure Data Lake Storage Gen2에 대해 각각 필요한 구성을 알아봅니다.
LinkedServiceReference 데이터 흐름이 Azure Synapse Analytics에서 읽거나 쓰는 경우에만
staging.folderPath Azure Synapse Analytics 원본 또는 싱크를 사용하는 경우 PolyBase 스테이징에 사용되는 Blob Storage 계정의 폴더 경로입니다. 문자열 데이터 흐름이 Azure Synapse Analytics에서 읽거나 쓰는 경우에만
traceLevel 데이터 흐름 작업 실행의 로깅 수준을 설정합니다. Fine, Coarse, None 아니요

데이터 흐름 실행

런타임에 동적으로 데이터 흐름 컴퓨팅 크기 설정

코어 수 및 컴퓨팅 유형 속성을 런타임에 들어오는 원본 데이터의 크기에 맞춰 동적으로 설정할 수 있습니다. 조회 또는 메타데이터 가져오기와 같은 파이프라인 작업을 사용하여 원본 데이터 세트 데이터의 크기를 찾습니다. 그런 다음 데이터 흐름 작업 속성에서 동적 콘텐츠 추가를 사용합니다. 소형, 중형 또는 대형 컴퓨팅 크기를 선택할 수 있습니다. 필요한 경우 "사용자 지정"을 선택하고 컴퓨팅 형식 및 코어 수를 수동으로 구성합니다.

동적 데이터 흐름

다음은 이 기술을 설명하는 간단한 동영상 자습서입니다.

데이터 흐름 통합 런타임

데이터 흐름 작업 실행에 사용할 통합 런타임을 선택합니다. 기본값으로 서비스는 4개의 작업자 코어와 함께 Azure Integration Runtime 자동 해결을 사용합니다. 이 IR은 범용 컴퓨팅 유형을 사용하며 서비스 인스턴스와 동일한 지역에서 실행됩니다. 작업 가능 파이프라인의 경우, 데이터 흐름 활동 실행을 위한 특정 지역, 계산 유형, 코어 수, TTL을 정의하는 고유한 Azure Integration Runtime을 만드는 것이 좋습니다.

8+8(총 16개 v-코어) 구성의 범용 최소 컴퓨팅 형식 및 10분 TTL(Time to live)이 대부분의 프로덕션 워크로드에 대한 최소 권장 사항입니다. Azure IR는 소형 TTL을 설정하여 콜드 클러스터에 대해 몇 분의 시작 시간을 발생시키지 않는 웜 클러스터를 유지할 수 있습니다. 자세한 내용은 Azure 통합 런타임을 참조하세요.

Azure Integration Runtime

Important

데이터 흐름 작업의 통합 런타임 선택은 파이프라인의 '트리거된 실행'에만 적용됩니다. 데이터 흐름을 사용하는 파이프라인 디버깅은 디버그 세션에 지정된 클러스터에서 실행됩니다.

PolyBase

Azure Synapse Analytics를 싱크 또는 원본으로 사용하는 경우 PolyBase 일괄 처리 로드를 위한 스테이징 위치를 선택해야 합니다. PolyBase에서는 데이터를 행 단위로 로드하는 대신 일괄 처리로 대량 로드할 수 있습니다. PolyBase는 Azure Synapse Analytics로의 로드 시간을 대폭 단축합니다.

검사점 키

데이터 흐름 원본에 대한 변경 캡처 옵션을 사용하는 경우 ADF는 자동으로 검사점을 유지하고 관리합니다. 기본 검사점 키는 데이터 흐름 이름 및 파이프라인 이름의 해시입니다. 원본 테이블 또는 폴더에 동적 패턴을 사용하는 경우 이 해시를 재정의하고 여기에서 고유한 검사점 키 값을 설정해야 합니다.

로깅 수준

데이터 흐름 활동의 모든 파이프라인 실행에서 모든 자세한 원격 분석 로그를 완전히 기록할 필요가 없는 경우 필요에 따라 로깅 수준을 '기본' 또는 '없음'으로 설정할 수 있습니다. '자세한 정보' 모드(기본값)에서 데이터 흐름을 실행하는 경우 서비스에 데이터 변환 중 각 개별 파티션 수준에서 작업을 완전히 기록하도록 요청하는 것입니다. 이 작업은 비용이 많이 들 수 있으므로 문제를 해결하는 경우에만 자세한 정보를 사용하면 전체 데이터 흐름 및 파이프라인 성능을 향상시킬 수 있습니다. "기본" 모드는 변환 기간만 기록하고 "없음" 모드는 기간 요약만 제공합니다.

로깅 수준

싱크 속성

데이터 흐름의 그룹화 기능을 사용하면 싱크의 실행 순서를 설정할 수 있고 동일한 그룹 번호를 사용하여 싱크를 그룹화할 수 있습니다. 그룹을 쉽게 관리할 수 있도록 ADF에게 동일한 그룹의 싱크를 동시에 실행하도록 요청할 수 있습니다. 또한 싱크 중 하나에 오류가 발생한 후에도 계속하도록 싱크 그룹을 설정할 수도 있습니다.

데이터 흐름 싱크의 기본 동작은 각 싱크를 순차적으로 실행하고 싱크에서 오류가 발생할 경우 데이터 흐름이 실패하도록 하는 것입니다. 또한 데이터 흐름 속성으로 이동하여 싱크에 대해 다른 우선 순위를 설정하지 않는 한 모든 싱크는 기본적으로 동일한 그룹으로 지정됩니다.

싱크 속성

첫 행 전용

이 옵션은 “작업에 출력”에 대해 캐시 싱크를 사용하는 것으로 설정된 데이터 흐름에만 사용할 수 있습니다. 파이프라인에 직접 삽입되는 데이터 흐름의 출력은 2MB로 제한됩니다. “첫 행 전용”을 설정하면 데이터 흐름 활동 출력을 파이프라인에 직접 삽입하는 경우 데이터 흐름에서 데이터 출력을 제한할 수 있습니다.

데이터 흐름 매개 변수화

매개 변수가 있는 데이터 세트

데이터 흐름에서 매개 변수가 있는 데이터 세트를 사용하는 경우 설정 탭에서 매개 변수 값을 설정합니다.

데이터 흐름 실행 매개 변수

매개 변수가 있는 데이터 흐름

데이터 흐름이 매개 변수가 있는 경우 매개 변수 탭에서 데이터 흐름 매개 변수의 동적 값을 설정합니다. 파이프라인 식 언어나 데이터 흐름 식 언어를 사용하여 동적 또는 리터럴 매개 변수 값을 할당할 수 있습니다. 자세한 내용은 데이터 흐름 매개 변수를 참조하세요.

매개 변수가 있는 컴퓨팅 속성

자동 확인 Azure 통합 런타임을 사용하고 compute.coreCount 및 compute.computeType의 값을 지정하는 경우 코어 수 또는 컴퓨팅 유형을 매개 변수화할 수 있습니다.

데이터 흐름 실행 매개 변수 예제

데이터 흐름 작업의 파이프라인 디버그

데이터 흐름 작업을 사용하여 파이프라인 디버그를 실행하려면 위쪽 표시줄의 데이터 흐름 디버그 슬라이더를 통해 데이터 흐름 디버그 모드로 전환해야 합니다. 디버그 모드를 사용하면 활성 Spark 클러스터에 대해 데이터 흐름을 실행할 수 있습니다. 자세한 내용은 디버그 모드를 참조하세요.

디버그 단추를 표시하는 스크린샷

디버그 파이프라인은 데이터 흐름 작업 설정에 지정된 통합 런타임 환경이 아닌 활성 디버그 클러스터에 대해 실행됩니다. 디버그 모드를 시작할 때 디버그 컴퓨팅 환경을 선택할 수 있습니다.

데이터 흐름 작업 모니터링

데이터 흐름 작업은 분할, 단계 시간 및 데이터 계보 정보를 볼 수 있는 특별한 모니터링 환경을 포함합니다. 작업 아래의 안경 아이콘을 통해 모니터링 창을 엽니다. 자세한 내용은 데이터 흐름 모니터링을 참조하세요.

데이터 흐름 작업 결과를 후속 작업에 사용

데이터 흐름 작업은 각 싱크에 쓰여진 행 수와 각 원본에서 읽은 행 수에 대한 메트릭을 출력합니다. 이러한 결과는 활동 실행 결과의 output 섹션에 반환됩니다. 반환되는 메트릭은 아래 JSON 형식으로 되어 있습니다.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

예를 들어 이름이 'dataflowActivity'인 작업에서 'sink1'이라는 싱크에 기록된 행 수를 가져오려면 @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten을 사용합니다.

해당 싱크에 사용된 'source1'이라는 원본에서 읽은 행 수를 가져오려면 @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead를 사용합니다.

참고 항목

싱크에 작성된 행이 0개인 경우 메트릭에 표시되지 않습니다. contains 함수를 사용하여 존재 여부를 확인할 수 있습니다. 예를 들어, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1')은 sing1에 행이 기록되었는지 확인합니다.

지원되는 다음 제어 흐름 작업을 참조하세요.