온라인 레저 및 여행 예약에서 임시 쿼리를 지원하는 Delta Lake 빌드

Azure Event Hubs
Azure Data Lake Storage
Azure Databricks
Azure Synapse Analytics

이 아키텍처는 대량의 원시 문서가 높은 빈도로 생성되는 여행 예약에 대한 델타 레이크 예제를 제공합니다.

Apache® 및 Apache Spark™는 미국 및/또는 기타 국가에서 Apache Software Foundation의 등록 상표 또는 상표입니다. 이러한 표시의 사용은 Apache Software Foundation에 의한 보증을 암시하지 않습니다.

아키텍처

Delta Lake 아키텍처 다이어그램

이 아키텍처의 Visio 파일을 다운로드합니다.

레저 및 여행 예약 시나리오는 대량의 원시 문서를 높은 빈도로 생성할 수 있습니다. 그러나 이러한 문서의 전체 내용을 인덱싱할 필요는 없습니다. 예를 들어 사용자가 관심 있는 문서 세트를 검색하기 위해 알려진 트랜잭션 ID 또는 특정 날짜의 고객 이름으로 검색해야 할 수 있습니다.

데이터 흐름

이 아키텍처의 기본 개념은 원시 데이터에서 검색하는 데 유용한 메타데이터를 분리하는 것입니다.

  • 메타데이터만 쿼리 가능한 서비스(예: Spark) 내에서 인덱싱되고 실제 데이터는 데이터 레이크에 저장됩니다.
  • 데이터 레이크의 원시 문서는 해당 경로를 통해 인덱싱된 메타데이터에 연결됩니다.
  • 문서를 쿼리하면 서비스에서 문서의 메타데이터를 검색하고, 이에 따라 해당 경로를 통해 데이터 레이크에서 실제 문서를 검색합니다.

메타데이터가 전체 데이터 자산의 일부로 구성되므로 이 솔루션은 비용을 크게 낮추고 성능을 향상시킵니다(예: 페타바이트의 원시 문서를 수십 기가바이트의 간결한 메타데이터로 설명할 수 있음).

또한 이 유형의 시나리오에서 일반적으로 해결해야 할 과제는 기록 깊이와 실시간 요구 사항을 균일하고 유지 관리하기 쉬운 고성능 시스템으로의 혼합을 관리하는 것입니다. Delta Lake 아키텍처는 이 과제에 응답합니다.

구성 요소

Azure App Service는 관리되는 가상 머신에서 앱을 빌드하고 호스트하는 PaaS(Platform as a Service)입니다. App Service는 앱이 실행되는 기본 컴퓨팅 인프라를 관리하고 리소스 사용 할당량 및 앱 메트릭 모니터링, 진단 정보 로깅 및 메트릭 기반 경고를 제공합니다.

Azure Data Factory는 스케일 아웃 서버리스 데이터 통합 및 데이터 변환을 위한 Azure의 클라우드 ETL(추출, 변환 및 로드) 서비스입니다. 코드가 필요 없는 UI로 직관적 작성 및 단일 창을 통한 모니터링 및 관리를 지원합니다. 또한 기존 SSIS(SQL Server Integration Services) 패키지를 Azure로 리프트 앤 시프트하고 Azure Data Factory에서 완전한 호환성을 통해 실행할 수 있습니다.

Azure Data Lake Storage Gen2는 Azure Blob Storage를 기반으로 하는 빅 데이터 분석 전용의 기능 세트입니다. Data Lake Storage Gen2는 Azure Data Lake Storage Gen1의 기능을 Azure Blob Storage와 통합합니다. 예를 들어 Data Lake Storage Gen2는 파일 시스템 의미 체계, 파일 수준 보안 및 확장을 제공합니다. 이러한 기능은 Blob Storage를 기반으로 하므로 고가용성/재해 복구 기능을 갖춘 계층화된 저렴한 스토리지를 가져올 수도 있습니다.

Azure Event Hubs는 간단하고 신뢰할 수 있으며 확장성 있는 완전 관리형 실시간 데이터 수집 서비스입니다. 모든 원본에서 초당 수백만 개의 이벤트를 스트리밍하여 동적 데이터 파이프라인을 빌드하고 비즈니스 문제에 즉시 대응합니다.

Azure Databricks는 Microsoft Azure Cloud Services에 최적화된 Apache Spark 기반 데이터 분석 플랫폼입니다. Azure Databricks는 데이터를 많이 사용하는 애플리케이션을 개발하기 위한 세 가지 환경, 즉 Databricks SQL, Databricks 데이터 과학 및 엔지니어링, Databricks Machine Learning을 제공합니다.

대안

메타데이터만 인덱싱하는 대신 쿼리 기능을 제공하는 서비스(예: Azure Databricks, Azure Synapse Analytics, Azure Cognitive Search 또는 Azure Data Explorer)에서 모든 원시 데이터를 인덱싱할 수 있습니다. 이 방법은 더 즉각적이지만, 특히 비용 측면에서 데이터 크기, 성능 요구 사항 및 업데이트 빈도가 결합된 효과에 주의를 기울여야 합니다.

델타 레이크를 사용하는 것과는 달리 람다 아키텍처를 사용하면 기록 데이터가 아닌 실시간 데이터를 다른 리포지토리에 유지하고 클라이언트에서 논리를 실행하여 다른 유형의 쿼리를 사용자에게 투명하게 만듭니다. 이 솔루션의 장점은 사용할 수 있는 더 큰 서비스 세트(예: Azure Stream Analytics 및 Azure SQL Database)이지만 아키텍처가 더 복잡해지고 코드베이스를 유지 관리하는 데 더 많은 비용이 듭니다.

Spark는 Azure Databricks, Azure Synapse AnalyticsAzure HDInsight와 함께 배포됩니다. 따라서 이 아키텍처는 이러한 Azure 데이터 서비스 중 하나, 바람직하게는 Delta Lake 0.8 또는 1.0을 지원하는 최신 Spark 버전을 사용하여 구현할 수 있습니다.

시나리오 정보

레저 및 여행 예약 시나리오에서 원시 데이터의 가시성은 여러 작업자에게 중요합니다. 기술 지원 팀은 실시간 진단을 감독하여 트랜잭션 처리를 지속적으로 모니터링하고 원하지 않는 문제에 빠르게 대응합니다. 데이터 엔지니어는 관련자 검토를 위해 데이터 내보내기를 감독하고 분석을 실시간으로 공급합니다. 고객 지원 팀에는 고객 문의 및 불만을 처리하기 위해 기록 및 최근 데이터가 필요합니다. 마지막으로 법률 팀은 규정 준수 의무가 준수되고 법적 조치가 수행되도록 보장합니다. 이러한 유형의 요구 사항은 외부 공급자를 집계하고 사용자 구매를 관리하는 마켓플레이스에서 일반적입니다. 예를 들어 레저 및 여행 예약 시스템은 서비스 검색, 공급자의 의미 있는 제안 집계, 사용자 예약 관리를 위해 사용자와 서비스 공급자를 중개하지 않습니다.

서비스 공급자, B2B 사용자 및 B2C 사용자가 있는 마켓플레이스의 다이어그램

잠재적인 사용 사례

이 아키텍처는 여행 및 서비스 산업에 적합합니다. 적용할 수 있는 시나리오는 다음과 같습니다.

  • 실시간(진단용) 또는 기록(규정 준수용) 원시 문서를 원래 형식으로 빠르게 검색
  • 페타바이트 규모의 데이터 관리
  • 실시간 진단에 대한 초 범위 성능 보장
  • 실시간 진단, 기록 쿼리 및 피드 분석에 대한 통합 방법 달성
  • 다운스트림 실시간 분석 제공
  • 비용 제어
  • 데이터를 원시 문서(예: json, xml 또는 csv 파일)로 인소싱
  • 데이터의 일부가 쿼리를 설명하는 데 충분한 경우
  • 사용자가 전체 원시 문서를 검색하려는 경우
  • 총 데이터 크기가 목표 가격보다 높은 시스템의 크기를 조정해야 하는 경우

이 아키텍처가 적합하지 않을 수 있는 경우는 다음과 같습니다.

  • 데이터는 레코드 집합으로 인소싱됩니다.
  • 사용자는 분석을 실행해야 합니다.
  • 사용자가 자신의 패키지 BI 도구를 적극적으로 사용합니다.
  • 데이터 크기가 비용 관점에서 문제가 되지 않습니다.

원시 문서는 반드시 필요한 것이 아닙니다.

고려 사항

이러한 고려 사항은 워크로드의 품질을 향상시키는 데 사용할 수 있는 일단의 지침 원칙인 Azure Well-Architected Framework의 핵심 요소를 구현합니다. 자세한 내용은 Microsoft Azure Well-Architected Framework를 참조하세요.

성능 효율성

성능 효율성은 사용자가 배치된 요구 사항을 효율적인 방식으로 충족하기 위해 워크로드의 크기를 조정할 수 있는 기능입니다. 자세한 내용은 성능 효율성 핵심 요소 개요를 참조하세요.

사용자는 데이터에 액세스하기 위해 이중 홉을 수행합니다. 먼저 메타데이터를 쿼리한 다음, 원하는 문서 세트를 검색합니다. 기존 또는 패키지된 클라이언트 자산을 다시 사용하기가 어려울 수 있습니다.

Azure Data Lake Storage Gen2는 핫, 쿨 및 보관의 세 가지 액세스 계층을 제공합니다. 문서를 가끔 검색하는 시나리오에서 쿨 성능 계층은 핫 성능 계층과 비슷한 성능을 보장하지만 비용이 절감된다는 장점이 있습니다. 최신 데이터를 사용하여 검색 가능성이 더 높은 시나리오에서는 쿨 계층과 핫 계층을 혼합하는 것이 좋습니다. 보관 계층 스토리지를 사용하면 하드 삭제에 대한 대안을 제공할 수 있을 뿐만 아니라 의미 있는 정보 또는 더 많이 집계된 데이터만 유지하여 데이터 크기를 줄일 수 있습니다.

데이터 레이크는 잠재적으로 페타바이트 규모의 데이터를 관리하므로 일반적으로 데이터 보존 정책이 적용됩니다. 데이터 거버넌스 솔루션은 핫 및 쿨 스토리지 계층 간에 이전 데이터를 이동하는 시기, 이전 데이터를 삭제하거나 보관하는 시기, 정보를 다운스트림 분석 솔루션으로 집계하는 시기와 같은 데이터 수명 주기를 관리하는 데 사용해야 합니다.

이 방법이 다운스트림 분석 시나리오에서 작동하는 방식을 고려합니다. 이 예제 워크로드는 분석을 위한 것이 아니지만 다운스트림 실시간 분석을 공급하는 데 적합하며, 일괄 처리 시나리오는 데이터 레이크에서 대신 공급할 수 있습니다.

확장성

Azure Event Hubs는 진단 및 규정 준수 시스템에서 원시 문서를 생성하는 트랜잭션 시스템을 분리할 때 매우 다양한 기능을 수행하며, 이미 설정된 아키텍처에서 구현하기 쉽고, 궁극적으로 사용하기도 쉽습니다. 그러나 트랜잭션 시스템은 이미 스트리밍 패턴을 사용하여 들어오는 문서를 처리할 수 있습니다. 이 경우 진단 및 규정 준수를 관리하기 위한 논리를 하위 스트림으로 스트리밍 애플리케이션에 통합해야 할 수 있습니다.

DevOps

이 예제 워크로드에서 사용되는 서비스를 자동으로 배포하려면 CI/CD(연속 통합 및 지속적인 배포) 프로세스를 사용하는 것이 가장 좋습니다. Azure DevOps 또는 GitHub Actions와 같은 솔루션을 사용하는 것이 좋습니다.

비용 최적화

비용 최적화는 불필요한 비용을 줄이고 운영 효율성을 높이는 방법을 찾는 것입니다. 자세한 내용은 비용 최적화 핵심 요소 개요를 참조하세요.

일반적으로 Azure 가격 계산기를 사용하여 비용을 예측합니다. 다른 고려 사항에 대해 알아보려면 Microsoft Azure Well-Architected Framework의 비용 섹션을 참조하세요.

시나리오 배포

다음 예제 아키텍처에서는 구조화된 원시 문서(예: json 또는 xml 파일)가 하나 이상의 Azure Event Hubs 네임스페이스에 포함되어 있다고 가정합니다. 그러나 문서 및 원본 서비스의 실제 형식 및 서식과 해당 통합 형식은 특정 시나리오 및 아키텍처에 따라 크게 달라집니다.

스트리밍

Spark 구조적 스트리밍을 사용하면 원시 데이터를 가져오고, 압축을 풀고, 구문 분석하고, 스트리밍 DataFrame의 테이블 형식 데이터로 변환합니다.

다음 PySpark 코드 조각은 Event Hubs에서 스트리밍 DataFrame을 로드하는 데 사용됩니다.

# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] = 
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup

streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

다음 코드 조각은 스트리밍 DataFrame을 처리하는 데 사용됩니다. 필요한 경우 먼저 Event Hubs 메시지의 압축을 푼 다음, 해당 json 구조를 테이블 형식으로 구문 분석합니다. 이 코드는 예제이며 특정 시나리오에 맞게 조정해야 합니다.

# Code tested in Databricks with Delta Lake 1.0

# defines an UDF to unzip the Event Hubs Body field, assuming it 
is gzipped

import zlib
def DecompressFunction(data):
  decoded_data = zlib.decompress(bytes(data), 15+32)
  return decoded_data.decode()

Decompress = udf(lambda body: DecompressFunction(body), 
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody", 
Decompress(col("body"))).select("DecodedBody")

# Parse json message from Event Hubs body, assuming the raw 
document is stored in the data field, and the others fields hold 
some metadata about it

schema = StructType([ \
    StructField("transactionId", LongType(),True), \
    StructField("timestamp",TimestampType(),True), \
    StructField("providerName", StringType(),True), \
    StructField("document", StringType(),True), \
    StructField("documentType", StringType(),True)
  ])

parsed_body_df = decoded_body_df.withColumn("jsonBody", 
from_json(col("DecodedBody"), schema)).select("jsonBody")

실제 데이터 처리는 두 단계로 구성됩니다. 첫 번째 단계는 처리 후 원시 문서를 검색하는 데 도움이 되는 메타데이터를 추출하는 것입니다. 실제 메타데이터는 사용 사례에 따라 다르지만, 일반화 가능한 예는 관련 날짜 및 식별자, 문서 형식, 원본 서비스 및 모든 유형의 범주입니다.

# Code tested in Databricks with Delta Lake 1.0

df = parsed_body_df \
    .withColumn("transactionId", 
parsed_body_df.jsonBody.transactionId) \
    .withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
    .withColumn("providerName", 
parsed_body_df.jsonBody.providerName) \
    .withColumn("data", parsed_body_df.jsonBody.data)
    .withColumn("documentType", 
parsed_body_df.jsonBody.documentType)

두 번째 처리 단계는 원시 문서를 저장할 Azure Data Lake Storage Gen2에 대한 경로를 생성하는 것입니다.

# Code tested in Databricks with Delta Lake 1.0

# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName, 
Suffix='', Extension=".gz"):
  yy = timeStamp.year
  mm = timeStamp.month
  dd = timeStamp.day
  hh = timeStamp.hour
  mn = timeStamp.minute
  Suffix = f"{Suffix}_" if Suffix != '' else ''
  Name = f"{Suffix}{providerName}{Extension}"
  path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
  return path

GetPath = udf(lambda timestamp, transactionId, providerName, 
suffix, extension: GetPathFunction(timestamp, transactionId, 
providerName, suffix, extension), StringType())

df = df.withColumn("path", GetPath(col("timestamp"), 
col("transactionId"), col("providerName"), col('documentType')))

델타 레이크의 메타데이터 수집

메타데이터는 실시간 쿼리 기능을 사용하도록 설정하는 델타 테이블에 기록됩니다. 쓰기는 버퍼에서 스트림되며, 테이블에 대한 쿼리는 버퍼의 결과를 테이블의 기록 부분의 결과와 병합할 수 있습니다.

다음 코드 조각에서는 메타스토어에서 델타 테이블을 정의하고 날짜별로 분할하는 방법을 보여 줍니다.

# Code tested in Databricks with Delta Lake 1.0

DeltaTable.create(spark) \
   .tableName("metadata") \
   .addColumn("transactionId", LongType()) \
   .addColumn("date", TimestampType()) \
   .addColumn("providerName", StringType()) \
   .addColumn("documentType", StringType()) \
   .addColumn("path", StringType()) \
   .partitionedBy("date") \
   .execute()

transactionId 필드는 숫자입니다. 분산 시스템을 전달하는 일반적인 메시지는 GUID를 대신 사용하여 트랜잭션을 고유하게 식별할 수 있습니다. 그러나 숫자 데이터 형식을 사용하면 대부분의 데이터 플랫폼에서 쿼리 성능이 향상됩니다.

고유한 트랜잭션 식별자를 할당하는 것은 클라우드 데이터 플랫폼(예: Spark)의 분산 특성을 고려할 때 어려울 수 있습니다. 유용한 방법은 이러한 트랜잭션 식별자에서 파티션 식별자(예: Event Hubs 파티션 번호) 및 파티션 내 증분 번호를 기반으로 하는 것입니다. 이 방법의 예는 Azure Databricks의 monotonically_increasing_id()입니다.

다음 코드 조각에서는 원시 문서의 메타데이터가 포함된 스트림을 델타 테이블에 추가하는 방법을 보여 줍니다.

# Code tested in Databricks with Delta Lake 1.0

df.withColumn("date", col("timeStamp").cast(DateType())) \
    .select("transactionId", "date", "providerName", 
"documentType", "path") \
    .writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", 
"/delta/metadata/_checkpoints/metadata_checkpoint") \
    .table("metadata")

분할은 테이블 스키마에 따라 스트림을 작성하는 동안 관리됩니다.

데이터 레이크의 데이터 수집

실제 원시 문서는 Azure Data Lake Gen2의 적절한 스토리지 성능 계층에 기록됩니다.

다음 코드 조각에서는 파일을 Azure Data Lake Store Gen2에 업로드하는 간단한 함수를 보여 줍니다. DataStreamWriter 클래스에서 foreach 메서드를 사용하면 스트리밍 DataFrame의 각 레코드에서 호스트되는 파일을 업로드할 수 있습니다.

# Code tested in Databricks with Delta Lake 1.0

from azure.storage.filedatalake import DataLakeServiceClient

def upload_data(storage_account_name, storage_account_key, 
file_system_name, file_path, data):

  service_client = 
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name), 
credential=storage_account_key)

  file_system_client = 
service_client.get_file_system_client(file_system_name)
  file_client = 
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
    
  if not file_client.exists:
    file_client.create_file()      

  file_client.upload_data(data, overwrite=True)
  
# Process a row to upload data to ADLS
def Row2ADLS(row):
  upload_data(adls_name, adls_key, adls_container, row['path'], 
row['data'])

df.writeStream.foreach(Row2ADLS).start()

Client

클라이언트는 메타데이터를 사용하여 표준 SQL 문을 통해 델타 테이블에서 문서 경로를 검색하고, 이에 따라 표준 Azure Data Lake Storage Gen2 API를 사용하여 데이터 레이크에서 실제 문서를 검색하는 사용자 지정 웹 애플리케이션일 수 있습니다.

예를 들어 다음 코드 조각에서는 특정 트랜잭션에서 모든 문서의 경로를 검색하는 방법을 보여 줍니다.

select * from metadata where transactionId = '123456'

다음 단계

관련 아키텍처 지침을 참조하세요.

다음 관련 아키텍처를 참조하세요.