스트리밍 데이터와 함께 델타 테이블 사용

완료됨

지금까지 살펴본 데이터는 모두 파일의 정적 데이터였습니다. 그러나 많은 데이터 분석 시나리오에는 거의 실시간으로 처리해야 하는 스트리밍 데이터가 포함됩니다. 예를 들어 IoT(사물 인터넷) 디바이스에서 내보낸 판독값을 캡처하고 테이블에 저장해야 할 수 있습니다. Spark는 동일한 방식으로 일괄 처리 데이터 및 스트리밍 데이터를 처리하므로 동일한 API를 사용하여 스트리밍 데이터를 실시간으로 처리할 수 있습니다.

Spark Structured Streaming

일반적인 스트림 처리 솔루션에는 원본에서 데이터 스트림을 지속적으로 읽고 필요에 따라 처리하여 특정 필드를 선택하고, 값을 집계, 그룹화 또는 기타 방식으로 데이터를 조작하고, 결과를 싱크에 쓰는 작업이 포함됩니다.

Spark에는 처리를 위해 스트리밍 데이터가 캡처되는 무한한 데이터 프레임을 기반으로 하는 API인 Spark 구조적 스트리밍을 통한 데이터 스트리밍 기본 지원이 포함됩니다. Spark 구조적 스트리밍 데이터 프레임은 다음을 포함하여 다양한 종류의 스트리밍 원본에서 데이터를 읽을 수 있습니다.

  • 네트워크 포트
  • Azure Event Hubs 또는 Kafka와 같은 실시간 메시지 조정 서비스
  • 파일 시스템 위치입니다.

Spark 구조적 스트리밍에 대한 자세한 내용은 Spark 설명서에서 구조적 스트리밍 프로그래밍 가이드를 참조하세요.

델타 테이블을 사용한 스트리밍

Delta 테이블을 Spark 구조적 스트리밍에 대한 원본 또는 싱크로 사용할 수 있습니다. 예를 들어 IoT 디바이스에서 실시간 데이터 스트림을 캡처하고 싱크로 델타 테이블에 직접 스트림을 쓸 수 있습니다. 그런 다음 테이블을 쿼리하여 최신 스트리밍 데이터를 볼 수 있습니다. 또는 델타를 스트리밍 원본으로 읽어 테이블에 새 데이터가 추가되면 거의 실시간으로 보고할 수 있습니다.

델타 테이블을 스트리밍 원본으로 사용

다음 PySpark 예제에서는 인터넷 판매 주문의 세부 정보를 저장하기 위해 Delta 테이블을 만듭니다.

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

인터넷 주문의 가상 데이터 스트림이 orders_in 테이블에 삽입됩니다.

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

확인하려면 입력 테이블에서 데이터를 읽고 표시할 수 있습니다.

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

그러면 데이터가 델타 테이블에서 스트리밍 데이터 프레임으로 로드됩니다.

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

참고 항목

델타 테이블을 스트리밍 원본으로 사용하는 경우 추가 작업만 스트림에 포함할 수 있습니다. ignoreChanges 또는 ignoreDeletes 옵션을 지정하지 않을 경우 데이터 수정으로 인해 오류가 발생합니다.

True를 반환해야 하는 isStreaming 속성을 사용하여 스트림이 스트리밍되고 있는지 확인할 수 있습니다.

# Verify that the stream is streaming
stream_df.isStreaming

데이터 스트림 변환

델타 테이블에서 스트리밍 데이터 프레임으로 데이터를 읽은 후 Spark 구조적 스트리밍 API를 사용하여 처리할 수 있습니다. 예를 들어 1분마다 주문 수를 계산하고 거의 실시간 시각화를 위해 집계된 결과를 다운스트림 프로세스로 보낼 수 있습니다.

이 예제에서는 Price 열에 NULL이 있는 모든 행이 필터링되고 IsBike 및 Total에 대해 새 열이 추가됩니다.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

델타 테이블을 스트리밍 싱크로 사용

그러면 데이터 스트림이 델타 테이블에 기록됩니다.

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

참고 항목

checkpointLocation 옵션은 스트림 처리 상태를 추적하는 검사점 파일을 작성하는 데 사용됩니다. 이 파일을 사용하면 스트림 처리가 중단된 지점에서 장애로부터 복구할 수 있습니다.

스트리밍 프로세스가 시작된 후 Delta Lake 테이블을 쿼리하여 출력 테이블에 무엇이 있는지 확인할 수 있습니다. 테이블을 쿼리하기 전에 짧은 지연이 있을 수 있습니다.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

이 쿼리의 결과에서 Order 3005는 Price 열에 NULL이 있으므로 제외됩니다. 변환 중에 추가된 두 개의 열인 IsBike와 Total이 표시됩니다.

OrderID OrderDate 고객 Product 수량 가격 IsBike 총계
3001 2023-09-01 도로용 자전거 빨간색 1 1200 1 1200
3002 2023-09-01 Carlson 산악용 자전거 은색 1 1500 1 1500
3003 2023-09-02 윌슨 도로용 자전거 노란색 2 1350 1 2700
3004 2023-09-02 도로 앞바퀴 1 115 0 115

완료되면 스트리밍 데이터를 중지하여 stop 메서드를 사용하여 불필요한 처리 비용을 방지합니다.

# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()

스트리밍 데이터에 델타 테이블을 사용하는 방법에 대한 자세한 내용은 Delta Lake 설명서의 Table 스트리밍 읽기 및 쓰기를 참조하세요.