Fabric Data Warehouse용 Spark 커넥터를 사용하면 Spark 개발자와 데이터 과학자가 레이크하우스의 웨어하우스 및 SQL 분석 엔드포인트에서 데이터에 액세스하고 작업할 수 있습니다. 커넥터는 다음과 같은 기능을 제공합니다.
- 동일한 작업 영역 또는 여러 작업 영역에서 Warehouse 또는 SQL 분석 엔드포인트의 데이터로 작업할 수 있습니다.
- Lakehouse의 SQL 분석 엔드포인트는 작업 영역 컨텍스트에 따라 자동으로 검색됩니다.
- 커넥터에는 간소화된 Spark API가 있고, 기본 복잡성을 추상화하며, 한 줄의 코드로만 작동합니다.
- 테이블 또는 뷰에 액세스하는 동안 커넥터는 SQL 엔진 수준에서 정의된 보안 모델을 유지합니다. 이러한 모델에는 OLS(개체 수준 보안), RLS(행 수준 보안) 및 CLS(열 수준 보안)가 포함됩니다.
- 커넥터는 Fabric 런타임 내에 미리 설치되어 있으므로 별도의 설치가 필요하지 않습니다.
인증
Microsoft Entra 인증은 통합 인증 방식입니다. 사용자가 Microsoft Fabric 작업 영역에 로그인하면 해당 자격 증명이 인증 및 권한 부여를 위해 SQL 엔진에 자동으로 전달됩니다. 자격 증명이 자동으로 매핑되므로 사용자는 특정 구성 옵션을 제공할 필요가 없습니다.
사용 권한
SQL 엔진에 연결하려면 Warehouse 또는 항목 수준의 SQL 분석 엔드포인트에 대한 읽기 권한(이는 SQL Server의 CONNECT 권한과 유사함)이 필요합니다. 또한 사용자는 특정 테이블 또는 뷰에서 데이터를 읽으려면 세분화된 개체 수준 권한이 필요합니다. 자세한 내용은 Microsoft Fabric의 데이터 웨어하우징에 대한 보안을 참조 하세요.
코드 템플릿 및 예제
메서드 서명 사용
다음 명령은 읽기 요청에 대한 synapsesql
메서드 서명을 보여줍니다. 세 부분으로 구성된 tableName
인수는 Warehouse 및 Lakehouse의 SQL 분석 엔드포인트에서 테이블 또는 뷰에 액세스하는 데 필요합니다. 시나리오에 따라 다음 이름으로 인수를 업데이트합니다.
- 1부: 창고 또는 레이크하우스의 이름.
- 파트 2: 스키마의 이름입니다.
- 파트 3: 테이블 또는 뷰 이름입니다.
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame
테이블 또는 뷰에서 직접 읽는 것 외에도 이 커넥터를 사용하면 SQL 엔진에 전달되고 결과가 Spark로 다시 반환되는 사용자 지정 또는 통과 쿼리를 지정할 수 있습니다.
spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame
이 커넥터는 지정된 Warehouse/Lakehouse에 대한 엔드포인트를 자동으로 검색하지만 명시적으로 지정하려면 이 작업을 수행할 수 있습니다.
//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>")
동일한 작업 영역 내의 데이터 읽기
중요한
노트북 시작 시 또는 커넥터 사용 전 다음 import 문을 실행하십시오.
import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants
다음 코드는 Spark DataFrame의 테이블 또는 뷰에서 데이터를 읽는 예제입니다.
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
다음 코드는 행 개수 제한이 10인 Spark DataFrame의 테이블 또는 뷰에서 데이터를 읽는 예제입니다.
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)
다음 코드는 필터를 적용한 후 Spark DataFrame의 테이블 또는 뷰에서 데이터를 읽는 예제입니다.
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")
다음 코드는 선택한 열에 대해서만 Spark DataFrame의 테이블 또는 뷰에서 데이터를 읽는 예제입니다.
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")
작업 영역에서 데이터를 읽습니다.
작업 영역의 창고 또는 레이크하우스에서 데이터에 액세스하고 읽으려면 창고 또는 레이크하우스가 있는 작업 영역 ID를 지정한 다음, 레이크하우스 또는 창고 항목 ID를 지정할 수 있습니다. 다음 줄에서는 지정된 작업 영역 ID와 lakehouse/warehouse ID를 사용하여 웨어하우스 또는 레이크하우스에서 Spark DataFrame의 테이블 또는 뷰에서 데이터를 읽는 예제를 제공합니다.
# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
Note
Notebook을 실행하는 경우 기본적으로 커넥터는 Notebook에 연결된 레이크하우스의 작업 영역에서 지정된 창고 또는 레이크하우스를 찾습니다. 다른 작업 영역에서 창고 또는 레이크하우스를 참조하려면 위와 같이 작업 영역 ID와 레이크하우스 또는 웨어하우스 항목 ID를 지정합니다.
창고 데이터를 기반으로 레이크하우스 테이블 만들기
이러한 코드 줄은 Spark DataFrame의 테이블 또는 뷰에서 데이터를 읽고 이를 사용하여 Lakehouse 테이블을 만드는 예제를 제공합니다.
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")
웨어하우스 테이블에 Spark 데이터 프레임 데이터 쓰기
이 커넥터는 패브릭 DW 테이블에 2단계 쓰기 프로세스를 사용합니다. 처음에는 Spark 데이터 프레임 데이터를 중간 스토리지로 단계화한 다음, 명령을 사용하여 COPY INTO
데이터를 Fabric DW 테이블에 수집합니다. 이 방법은 데이터 볼륨을 늘리면서 확장성을 보장합니다.
지원되는 DataFrame 저장 모드
데이터 프레임의 원본 데이터를 웨어하우스의 대상 테이블에 쓸 때 지원되는 저장 모드는 다음과 같습니다.
- ErrorIfExists(기본 저장 모드): 대상 테이블이 존재하면, 호출자에게 예외가 반환되어 쓰기가 중단됩니다. 그렇지 않으면 데이터를 사용하여 새 테이블이 만들어집니다.
- 무시: 대상 테이블이 있는 경우 쓰기는 오류를 반환하지 않고 쓰기 요청을 무시합니다. 그렇지 않으면 데이터를 사용하여 새 테이블이 만들어집니다.
- 덮어쓰기: 대상 테이블이 있는 경우 대상의 기존 데이터가 데이터로 바뀝니다. 그렇지 않으면 데이터를 사용하여 새 테이블이 만들어집니다.
- 추가: 대상 테이블이 있는 경우 새 데이터가 추가됩니다. 그렇지 않으면 데이터를 사용하여 새 테이블이 만들어집니다.
다음 코드는 Fabric DW 테이블에 Spark 데이터 프레임의 데이터를 쓰는 예제를 보여 줍니다.
df.write.synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>") # this uses default mode - errorifexists
df.write.mode("errorifexists").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("ignore").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("append").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("overwrite").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
Note
커넥터는 Lakehouse의 SQL 분석 엔드포인트가 읽기 전용이므로 Fabric DW 테이블에 쓰기만 지원합니다.
문제 해결
완료되면 셀의 출력에 읽기 응답 스니펫이 나타납니다. 현재 셀에서 실패하면 노트북의 후속 셀 실행도 취소됩니다. 자세한 오류 정보는 Spark 애플리케이션 로그에서 사용할 수 있습니다.
이 커넥터 사용에 대한 고려 사항
현재 커넥터는 다음과 같습니다.
- Supports data retrieval or read from Fabric warehouses and SQL analytics endpoints of lakehouse items.
- 다른 저장 모드를 사용하여 웨어하우스 테이블에 데이터 쓰기를 지원합니다. 최신 GA 런타임(예: 런타임 1.3)에서만 사용할 수 있습니다. 또한 현재
Private Link
이(가) 사용 설정되고Public Access
이(가) 차단된 경우, 쓰기 작업이 작동하지 않습니다. - 이제 Fabric DW는
Time Travel
를 지원하지만 이 커넥터는 시간 이동 구문을 사용하는 쿼리에는 작동하지 않습니다. - 일관성을 위해 Azure Synapse Analytics용 Apache Spark와 함께 제공되는 것과 같은 사용 서명을 유지합니다. 그러나 Azure Synapse Analytics에서 전용 SQL 풀에 연결하고 작업하는 것은 이전 버전과 호환되지 않습니다.
- 특수 문자가 있는 열 이름은 파트 3의 테이블/뷰 이름에 따라 쿼리가 제출되기 전에 이스케이프 문자를 추가하여 처리됩니다. In case of a custom or passthrough-query based read, users are required to escape column names that would contain special characters.