다음을 통해 공유


CREATE STREAMING TABLE

적용 대상:예로 표시된 확인 Databricks SQL

스트리밍 또는 증분 데이터 처리를 추가로 지원하는 델타 테이블인스트리밍 테이블을 만듭니다.

스트리밍 테이블은 Lakeflow Spark 선언적 파이프라인 및 Unity 카탈로그를 사용하는 Databricks SQL에서만 지원됩니다. 지원되는 Databricks Runtime 환경에서 이 명령을 실행하면 구문만 분석됩니다. SQL을 사용하여 Lakeflow Spark 선언적 파이프라인 코드 개발을 참조하세요.

구문

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    DEFAULT COLLATION UTF8_BINARY |
    TBLPROPERTIES clause |
    schedule |
    WITH { ROW FILTER clause } } [...]

schedule
  { SCHEDULE [ REFRESH ] schedule_clause |
    TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ]}

매개 변수

  • REFRESH

    지정한 경우 쿼리에 정의된 원본에서 사용할 수 있는 최신 데이터로 테이블을 새로 고칩니다. 쿼리가 시작되기 전에 도착하는 새 데이터만 처리됩니다. 명령을 실행하는 동안 원본에 추가되는 새 데이터는 다음 새로 고침까지 무시됩니다. CREATE OR REFRESH 새로 고침 작업은 완전히 선언적입니다. refresh 명령이 원래 테이블 만들기 문의 모든 메타데이터를 지정하지 않으면 지정되지 않은 메타데이터가 삭제됩니다.

  • 존재하지 않는 경우

    스트리밍 테이블이 없으면 만듭니다. 이 이름의 테이블이 이미 있는 경우 CREATE STREAMING TABLE 문이 무시됩니다.

    IF NOT EXISTS 또는 OR REFRESH 중 최대 하나를 지정할 수 있습니다.

  • table_name

    만들 테이블의 이름입니다. 이름에는 임시 사양 또는 옵션 사양이 포함되어서는 안됩니다. 이름이 정규화되지 않은 경우 테이블이 현재 스키마에 만들어집니다.

  • 테이블_규격

    이 선택적 절은 열 목록, 해당 형식, 속성, 설명 및 열 제약 조건을 정의합니다.

    테이블 스키마에서 열을 정의하지 않으면 AS query지정해야 합니다.

    • column_identifier

      열의 고유한 이름입니다.

      • 열_유형

        열의 데이터 형식 지정합니다.

      • NOT NULL

        지정한 경우 열은 NULL 값을 허용하지 않습니다.

      • 코멘트 column_comment

        열에 대한 설명을 제공하는 문자열 리터럴입니다.

      • column_constraint

        중요한

        이 기능은 공개 미리 보기 상태입니다.

        스트리밍 테이블의 열에 기본 키 또는 외래 키 제약 조건을 추가합니다. hive_metastore 카탈로그의 테이블에는 제약 조건이 지원되지 않습니다.

      • MASK 절

        열 마스크 함수를 추가하여 중요한 데이터를 익명화합니다. 해당 열의 모든 후속 쿼리는 열의 원래 값 대신 열에 대해 해당 함수를 평가한 결과를 받습니다. 이 기능은 함수가 호출하는 사용자의 ID 또는 그룹 멤버 자격을 검사하여 값을 수정할지 여부를 결정할 수 있는 세분화된 액세스 제어 목적에 유용할 수 있습니다.

      • CONSTRAINT expectation_name 기대 (expectation_expr) [ 위반 시 { 실패 UPDATE | 행 삭제 } ]

        테이블에 데이터 품질 기대치를 추가합니다. 이러한 데이터 품질 기대치는 시간이 지남에 따라 추적되고 스트리밍 테이블의 이벤트 로그를 통해 액세스할 수 있습니다. FAIL UPDATE 예상으로 인해 테이블을 만들고 테이블을 새로 고치면 처리가 실패합니다. DROP ROW 기대치로 인해 기대치가 충족되지 않으면 전체 행이 삭제됩니다.

        expectation_expr 리터럴, 테이블 내의 열 식별자 및 다음을 제외한 결정적 기본 제공 SQL 함수 또는 연산자로 구성될 수 있습니다.

        또한 expr에는 하위 쿼리가 포함되어서는 안 됩니다.

      • 테이블 제약 조건

        중요한

        이 기능은 공개 미리 보기 상태입니다.

        스트리밍 테이블에 정보 기본 키 또는 정보 외래 키 제약 조건을 추가합니다. hive_metastore 카탈로그의 테이블에는 키 제약 조건이 지원되지 않습니다.

  • 테이블_조항

    필요에 따라 새 테이블에 대한 분할, 주석, 사용자 정의 속성 및 새로 고침 일정을 지정합니다. 각 하위 절은 한 번만 지정할 수 있습니다.

    • 에 의해 분할됨

      테이블을 분할할 테이블 열의 선택적 목록입니다.

      참고

      Liquid 클러스터링에서는 클러스터링을 위한 유연하고 최적화된 솔루션을 제공합니다. 스트리밍 테이블에 CLUSTER BY 대신 PARTITIONED BY 사용하는 것이 좋습니다.

    • CLUSTER BY

      열 하위 집합을 기준으로 클러스터링할 수 있는 선택 사항입니다. 자동 액체 클러스터링을 CLUSTER BY AUTO사용하고 Databricks는 클러스터링 키를 지능적으로 선택하여 쿼리 성능을 최적화합니다. 테이블에 대한 액체 클러스터링 사용을 참조하세요.

      Liquid 클러스터링을 .와 결합 PARTITIONED BY할 수 없습니다.

    • 코멘트 table_comment

      테이블을 설명하기 위한 리터럴 STRING입니다.

    • 기본 데이터 정렬 UTF8_BINARY

      적용 대상:yes Databricks SQL 확인 표시 예 Databricks Runtime 17.1 이상으로 표시

      스트리밍 테이블의 기본 정렬을 UTF8_BINARY로 강제 적용합니다. 테이블이 만들어지는 스키마에 기본 데이터 정렬이 아닌 다른 데이터 정렬 UTF8_BINARY이 있는 경우 이 절은 필수입니다. 스트리밍 테이블의 기본 코레이션은 query 내에서 및 열 형식의 기본 코레이션으로 사용됩니다.

    • TBLPROPERTIES

      선택적으로 하나 이상의 사용자 정의 속성을 설정합니다.

      이 설정을 사용하여 이 문을 실행하는 데 사용되는 Lakeflow Spark 선언적 파이프라인 런타임 채널을 지정합니다. pipelines.channel 속성의 값을 "PREVIEW" 또는 "CURRENT"설정합니다. 기본값은 "CURRENT"입니다. Lakeflow Spark 선언적 파이프라인 채널에 대한 자세한 내용은 Lakeflow Spark 선언적 파이프라인 런타임 채널을 참조하세요.

    • 일정

      일정은 문 또는 문일 SCHEDULETRIGGER 수 있습니다.

      • 일정 [ REFRESH ] 일정_조항

        • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

          주기적으로 발생하는 새로 고침을 예약하려면 EVERY 구문을 사용합니다. EVERY 구문을 지정하면 스트리밍 테이블 또는 구체화된 뷰는 제공된 값(예: HOUR, HOURS, DAY, DAYS, WEEK또는 WEEKS)에 따라 지정된 간격으로 주기적으로 새로 고쳐집니다. 다음 표에서는 number허용되는 정수 값을 나열합니다.

          시간 단위 정수 값
          HOUR or HOURS 1 <= H <= 72
          DAY or DAYS 1<은 D이고 <은 31입니다
          WEEK or WEEKS 1 <= W <= 8

          참고

          포함된 시간 단위의 단수 및 복수 형태는 의미상 동일합니다.

        • CRON cron_string [ AT TIME ZONE timezone_id ]

          새로 고침을 예약하려면 quartz cron 값을 사용합니다. 유효한 time_zone_values 허용됩니다. AT TIME ZONE LOCAL은 지원되지 않습니다.

          AT TIME ZONE이 없는 경우 세션 표준 시간대가 사용됩니다. AT TIME ZONE가 없고 세션 시간대가 설정되지 않으면 오류가 발생합니다. SCHEDULESCHEDULE REFRESH와 의미 체계가 같습니다.

        일정은 CREATE 명령의 일부로 제공할 수 있습니다. ALTER STREAMING TABLE 사용하거나 CREATE OR REFRESH 절과 함께 SCHEDULE 명령을 실행하여 만든 후 스트리밍 테이블의 일정을 변경합니다.

      • TRIGGER ON UPDATE [ 최대 모든 trigger_interval ]

        중요한

        TRIGGER ON UPDATE 기능은 베타에 있습니다.

        필요에 따라 업스트림 데이터 원본이 업데이트될 때 1분에 한 번씩 테이블을 새로 고치도록 설정합니다. 새로 고침 사이에 최소 시간이 필요한 값을 AT MOST EVERY 설정합니다.

        업스트림 데이터 원본은 외부 또는 관리형 델타 테이블(구체화된 뷰 또는 스트리밍 테이블 포함) 또는 종속성이 지원되는 테이블 형식으로 제한되는 관리형 뷰여야 합니다.

        파일 이벤트를 사용하도록 설정하면 트리거 성능이 향상되고 트리거 업데이트에 대한 일부 제한이 증가할 수 있습니다.

        INTERVAL trigger_interval문은 1분 이상입니다.

        TRIGGER ON UPDATE 에는 다음과 같은 제한 사항이 있습니다.

        • TRIGGER ON UPDATE을 사용하는 경우 스트리밍 테이블당 10개 이하의 업스트림 데이터 원본이 있습니다.
        • TRIGGER ON UPDATE을 사용하여 최대 1,000개 스트리밍 테이블 또는 구체화된 뷰를 지정할 수 있습니다.
        • 이 절은 AT MOST EVERY 기본적으로 1분이며 1분 미만일 수 없습니다.
  • ROW FILTER 절과 함께

    테이블에 행 필터 함수를 추가합니다. 해당 테이블의 모든 후속 쿼리는 함수가 부울 TRUE로 평가되는 행의 하위 집합을 받습니다. 이는 함수가 호출하는 사용자의 ID 또는 그룹 멤버 자격을 검사하여 특정 행을 필터링할지 여부를 결정할 수 있는 세분화된 액세스 제어 목적에 유용할 수 있습니다.

  • AS 쿼리

    이 절은 query데이터를 사용하여 테이블을 채웁니다. 이 쿼리는 스트리밍 쿼리여야 합니다. 이 작업은 증분 방식으로 처리하려는 모든 관계로 STREAM 키워드를 추가하여 수행할 수 있습니다. query table_specification 함께 지정하면 table_specification 지정된 테이블 스키마에 query반환된 모든 열이 포함되어야 합니다. 그렇지 않으면 오류가 발생합니다. table_specification 지정되었지만 query 반환되지 않은 열은 쿼리할 때 null 값을 반환합니다.

스트리밍 테이블과 다른 테이블 간의 차이점

스트리밍 테이블은 증가하는 데이터 세트를 처리할 때 각 행을 한 번만 처리하도록 설계된 상태 저장 테이블입니다. 대부분의 데이터 세트는 시간이 지남에 따라 지속적으로 증가하므로 스트리밍 테이블은 대부분의 수집 워크로드에 적합합니다. 스트리밍 테이블은 데이터 새로 고침 및 짧은 대기 시간이 필요한 파이프라인에 적합합니다. 스트리밍 테이블은 새 데이터가 도착할 때 결과를 증분 방식으로 계산할 수 있으므로 각 업데이트로 모든 원본 데이터를 완전히 다시 계산하지 않고도 결과를 최신 상태로 유지할 수 있으므로 대규모 변환에도 유용할 수 있습니다. 스트리밍 테이블은 추가 전용인 데이터 원본용으로 설계되었습니다.

스트리밍 테이블은 쿼리에 제공된 원본에서 사용할 수 있는 최신 데이터를 처리하는 REFRESH같은 추가 명령을 허용합니다. 제공된 쿼리에 대한 변경 내용은 이전에 처리된 데이터가 아닌 REFRESH호출하여 새 데이터에만 반영됩니다. 기존 데이터에도 변경 내용을 적용하려면 REFRESH TABLE <table_name> FULL을 실행하여 FULL REFRESH를 수행해야 합니다. 전체 새로 고침은 원본에서 사용 가능한 모든 데이터를 최신 정의로 다시 처리합니다. 전체 새로 고침이 기존 데이터를 자르기 때문에 전체 데이터 기록을 유지하지 않거나 Kafka와 같은 짧은 보존 기간이 있는 원본에서는 전체 새로 고침을 호출하지 않는 것이 좋습니다. 더 이상 원본에서 데이터를 사용할 수 없는 경우 이전 데이터를 복구하지 못할 수 있습니다.

행 필터 및 열 마스크

행 필터를 사용하면 테이블 검색이 행을 가져올 때마다 필터로 적용되는 함수를 지정할 수 있습니다. 이러한 필터를 통해 후속 쿼리는 필터 조건자가 true로 평가되는 행만 반환합니다.

열 마스크를 사용하면 테이블 검색이 행을 가져올 때마다 열 값을 마스킹할 수 있습니다. 해당 열과 관련된 모든 이후 쿼리는 열에 대한 함수를 평가하고 열의 원래 값을 대체한 결과를 받게 됩니다.

행 필터 및 열 마스크를 사용하는 방법에 대한 자세한 내용은 행 필터 및 열 마스크를 참조하세요.

행 필터 및 열 마스크 관리

스트리밍 테이블의 행 필터 및 열 마스크는 CREATE OR REFRESH 문을 통해 추가, 업데이트 또는 삭제해야 합니다.

행동

  • 정의자로 새로 고침: 또는 CREATE OR REFRESH 문이 스트리밍 테이블을 새로 고치면 REFRESH 행 필터 함수는 정의자의 권한(테이블 소유자)으로 실행됩니다. 즉, 테이블 새로 고침은 스트리밍 테이블을 만든 사용자의 보안 컨텍스트를 사용합니다.
  • 쿼리: 대부분의 필터는 정의자의 권한으로 실행되지만 사용자 컨텍스트(예: 및CURRENT_USER)를 IS_MEMBER 확인하는 함수는 예외입니다. 이러한 함수는 호출자로 실행됩니다. 이 방법은 현재 사용자의 컨텍스트에 따라 사용자별 데이터 보안 및 액세스 제어를 적용합니다.

가시성

DESCRIBE EXTENDED, INFORMATION_SCHEMA또는 카탈로그 탐색기를 사용하여 지정된 스트리밍 테이블에 적용되는 기존 행 필터 및 열 마스크를 검사합니다. 이 기능을 사용하면 사용자가 스트리밍 테이블에 대한 데이터 액세스 및 보호 조치를 감사하고 검토할 수 있습니다.

제한 사항

  • 테이블 소유자만 스트리밍 테이블을 새로 고쳐 최신 데이터를 가져올 수 있습니다.
  • ALTER TABLE 명령은 스트리밍 테이블에서 허용되지 않습니다. 테이블의 정의 및 속성은 CREATE OR REFRESH 또는 ALTER STREAMING TABLE 문을 통해 변경해야 합니다.
  • INSERT INTOMERGE 같은 DML 명령을 통해 테이블 스키마를 발전시키는 것은 지원되지 않습니다.
  • 스트리밍 테이블에서는 다음 명령이 지원되지 않습니다.
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing은 지원되지 않습니다.
  • 테이블 이름을 바꾸거나 소유자를 변경하는 것은 지원되지 않습니다.
  • 스트리밍 테이블에서는 PRIMARY KEYFOREIGN KEY와 같은 테이블 제약 조건이 hive_metastore 카탈로그에서 지원되지 않습니다.
  • 생성된 열, ID 열 및 기본 열은 지원되지 않습니다.

예제

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
  TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
  AS SELECT *
  FROM STREAM source_stream_data;

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM STREAM sales;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')