이 자습서는 15분이 소요되며 Dataflow Gen2를 사용하여 레이크하우스로 데이터를 증분 방식으로 모으는 방법을 설명합니다.
데이터 대상에서 데이터를 증분 방식으로 수집하려면 새 데이터 또는 업데이트된 데이터만 데이터 대상으로 로드하는 기술이 필요합니다. 이 기술은 쿼리를 사용하여 데이터 대상에 따라 데이터를 필터링하여 수행할 수 있습니다. 이 자습서에서는 OData 원본에서 레이크하우스로 데이터를 로드하는 데이터 흐름을 만드는 방법과 데이터 흐름에 쿼리를 추가하여 데이터 대상에 따라 데이터를 필터링하는 방법을 보여 줍니다.
이 자습서의 개략적인 단계는 다음과 같습니다.
- OData 원본에서 레이크하우스로 데이터를 로드하는 데이터 흐름을 만듭니다.
- 데이터 흐름에 쿼리를 추가하여 데이터 대상에 따라 데이터를 필터링합니다.
- (선택 사항) Notebook 및 파이프라인을 사용하여 데이터를 다시 로드합니다.
필수 조건
Microsoft Fabric 사용 작업 영역이 있어야 합니다. 아직 없는 경우 작업 영역 만들기를 참조하세요. 또한 이 자습서에서는 데이터 흐름 Gen2에서 다이어그램 보기를 사용 중이라고 가정합니다. 다이어그램 보기를 보고 있는지 확인하려면, 위쪽 리본에서 보기로 가서 다이어그램 보기가 선택되어 있는지 확인합니다.
OData 원본에서 레이크하우스로 데이터를 로드하는 데이터 흐름 만들기
이 구역에서는 OData 원본에서 레이크하우스로 데이터를 로드하는 데이터 흐름을 만듭니다.
작업 영역에서 새 레이크하우스 만들기.
작업 영역에 새 2세대 데이터 흐름 만들기.
데이터 흐름에 새 원본을 추가합니다. OData 원본을 선택하고 URL(
https://services.OData.org/V4/Northwind/Northwind.svc)을 입력합니다.
주문 테이블을 선택하고 다음을 선택합니다.
유지할 열을 선택하십시오.
OrderIDCustomerIDEmployeeIDOrderDateRequiredDateShippedDateShipViaFreightShipNameShipAddressShipCityShipRegionShipPostalCodeShipCountry
OrderDate,RequiredDate및ShippedDate에서datetime까지의 데이터 형식을 변경합니다.
다음 설정을 사용하여 레이크하우스에 데이터 대상을 설정합니다.
- 데이터 대상:
Lakehouse - 레이크하우스: 1단계에서 만든 레이크하우스를 선택합니다.
- 새 테이블명:
Orders - 업데이트 메서드:
Replace
- 데이터 대상:
다음을 선택하고 데이터 흐름을 게시합니다.
이제 OData 원본에서 레이크하우스로 데이터를 로드하는 데이터 흐름을 만들었습니다. 이 데이터 흐름은 다음 구역에서 데이터 흐름에 쿼리를 추가하여 데이터 대상에 따라 데이터를 필터링하는 데 사용됩니다. 그런 다음 데이터 흐름을 통해 노트북과 파이프라인을 사용하여 데이터를 다시 로드할 수 있습니다.
데이터 흐름에 쿼리를 추가하여 데이터 대상에 따라 데이터 필터링
이 구역에서는 데이터 흐름에 쿼리를 추가하여 대상 레이크하우스의 데이터를 기반으로 데이터를 필터링합니다. 쿼리는 데이터 흐름 새로 고침이 시작될 때 레이크하우스의 최대 OrderID(을)를 가져오고, 최대 OrderId를 사용하여 원본에서 OrderId가 더 높은 주문만 가져와 데이터 대상으로 추가합니다. 이 순서는 OrderID의 오름차순으로 원본에 추가된다고 가정합니다. 그렇지 않은 경우 다른 열을 사용하여 데이터를 필터링할 수 있습니다. 예를 들어 OrderDate 열을 사용하여 데이터를 필터할 수 있습니다.
주의
OData 필터는 데이터 원본에서 데이터를 받은 후 패브릭 내에서 적용되지만 SQL Server와 같은 데이터베이스 원본의 경우 필터는 백 엔드 데이터 원본에 제출된 쿼리에 적용되며 필터링된 행만 서비스에 반환됩니다.
데이터 흐름이 새로 고쳐지면 이전 구역에서 만든 데이터 흐름을 다시 엽니다.
IncrementalOrderID(으)로 명명된 새 쿼리를 만들고 이전 구역에서 만든 레이크하우스의 Orders 테이블에서 데이터를 가져옵니다.
이 쿼리의 스테이징을 사용하지 않도록 설정합니다.
데이터 미리 보기에서
OrderID열을 마우스 오른쪽 버튼으로 클릭하고 드릴다운을 선택합니다.
리본에서 목록 도구 ->통계 ->최대값을 선택합니다.
이제 레이크하우스에서 최대 OrderID를 반환하는 쿼리가 있습니다. 이 쿼리는 OData 원본에서 데이터를 필터링하는 데 사용됩니다. 다음 구역에서는 데이터 흐름에 쿼리를 추가하여 레이크하우스의 최대 OrderID를 기반으로 OData 원본의 데이터를 필터링합니다.
Orders 쿼리로 돌아가서 데이터를 필터링하는 새 단계를 추가합니다. 다음 설정을 사용합니다.
- 열:
OrderID - 작업:
Greater than - 값:
IncrementalOrderID매개 변수
- 열:
다음 대화 상자를 확인하여 OData 원본과 레이크하우스의 데이터를 결합할 수 있습니다.
다음 설정에 따라 데이터 목적지를 업데이트합니다.
- 업데이트 메서드:
Append
- 업데이트 메서드:
데이터 흐름을 게시합니다.
이제 데이터 흐름에는 레이크하우스의 최대 OrderID를 기반으로 OData 원본의 데이터를 필터링하는 쿼리가 포함됩니다. 즉, 새 데이터 또는 업데이트된 데이터만 레이크하우스에 로드됩니다. 다음 구역에서는 데이터 흐름을 사용하여 Notebook 및 파이프라인을 사용하여 데이터를 다시 로드합니다.
(선택 사항) Notebook 및 파이프라인을 사용하여 데이터 다시 로드
필요에 따라 Notebook 및 파이프라인을 사용하여 특정 데이터를 다시 로드할 수 있습니다. Notebook에서 사용자 지정 Python 코드를 사용하면 레이크하우스에서 이전 데이터를 제거합니다. 그런 다음 먼저 Notebook을 실행하고 데이터 흐름을 순차적으로 실행하는 파이프라인을 만들어 OData 원본의 데이터를 레이크하우스로 다시 로드합니다. Notebook은 여러 언어를 지원하지만 이 자습서에서는 PySpark를 사용합니다. Pyspark는 Spark용 Python API이며 이 자습서에서 Spark SQL 쿼리를 실행하는 데 사용됩니다.
작업 영역에서 새 Notebook 만들기.
Notebook에 다음 PySpark 코드를 추가합니다.
### Variables LakehouseName = "YOURLAKEHOUSE" TableName = "Orders" ColName = "OrderID" NumberOfOrdersToRemove = "10" ### Remove Old Orders Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect() Reload = Reload[0].ReLoadValue spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))Notebook을 실행하여 레이크하우스에서 데이터가 제거되었는지 확인합니다.
작업 영역에서 새 파이프라인을 만듭니다.
파이프라인에 새 Notebook 활동을 추가하고 이전 단계에서 만든 Notebook을 선택합니다.
파이프라인에 새 데이터 흐름 작업을 추가하고 이전 구역에서 만든 데이터 흐름을 선택합니다.
성공 트리거를 사용하여 Notebook 활동을 데이터 흐름 활동에 연결합니다.
파이프라인을 저장하고 실행합니다.
이제 레이크하우스에서 이전 데이터를 제거하고 OData 원본의 데이터를 레이크하우스로 다시 로드하는 파이프라인이 있습니다. 이 설정을 사용하면 OData 원본의 데이터를 정기적으로 레이크하우스로 다시 로드할 수 있습니다.