Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Se aplica a:
Databricks SQL
Crea una tabla de flujo de datos, que es una tabla Delta con compatibilidad adicional para el procesamiento de datos de manera incremental o de transmisión.
Las tablas de streaming solo se admiten en las canalizaciones declarativas de Lakeflow Spark y en Databricks SQL con Unity Catalog. Al ejecutar este comando en un entorno de ejecución compatible con Databricks Runtime, solo se parsea la sintaxis. Vea Desarrollo de código de canalizaciones declarativas de Lakeflow Spark con SQL.
Sintaxis
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
COMMENT table_comment |
DEFAULT COLLATION UTF8_BINARY |
TBLPROPERTIES clause |
schedule |
WITH { ROW FILTER clause } } [...]
schedule
{ SCHEDULE [ REFRESH ] schedule_clause |
TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ]}
Parámetros
REFRESH
Si se especifica, actualiza la tabla con los datos más recientes disponibles de los orígenes definidos en la consulta. Solo se procesan los nuevos datos que llegan antes de que se inicie la consulta. Los nuevos datos que se agregan a los orígenes durante la ejecución del comando se omiten hasta la siguiente actualización. La operación de actualización de CREATE OR REFRESH es totalmente declarativa. Si un comando refresh no especifica todos los metadatos de la instrucción original de creación de la tabla, se eliminan los metadatos no especificados.
SI NO EXISTE
Crea la tabla de streaming si no existe. Si ya existe una tabla con este nombre, se omitirá la instrucción
CREATE STREAMING TABLE.Puede especificar como máximo uno de
IF NOT EXISTSoOR REFRESH.-
Nombre de la tabla que se va a crear. El nombre no debe incluir una especificación temporal ni una especificación de opciones. Si el nombre no está calificado, la tabla se crea en el esquema actual.
especificación_de_tabla
Esta cláusula opcional define la lista de columnas y sus tipos, propiedades, descripciones y restricciones de columnas.
Si no define columnas en el esquema de la tabla, debe especificar
AS query.-
Nombre único para la columna.
-
Especifica el tipo de datos de la columna.
NOT NULL
Si se especifica, la columna no acepta valores
NULL.COMENTARIO column_comment
Literal de cadena para describir la columna.
-
Importante
Esta característica está en versión preliminar pública.
Agrega una clave principal o una restricción de clave externa a la columna de una tabla de transmisión. No se admiten restricciones para las tablas del catálogo
hive_metastore. -
Agrega una función de máscara de columna para anonimizar datos confidenciales. Todas las consultas posteriores de esa columna reciben el resultado de evaluar esa función sobre la columna en lugar del valor original de la columna. Esto puede ser útil para fines de control de acceso específicos en los que la función puede inspeccionar la identidad o las pertenencias a grupos del usuario que realiza la invocación para decidir si expurga el valor.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ EN CASO DE VIOLACIÓN { FALLAR UPDATE | ELIMINAR FILA } ]
Agrega expectativas de calidad de datos a la tabla. Se puede realizar un seguimiento de estas expectativas de calidad de los datos se pueden realizar a lo largo del tiempo y acceder a ellas a través del registro de eventos de la tabla de streaming. Una
FAIL UPDATEexpectativa hace que se produzca un error en el procesamiento al crear la tabla, así como al actualizar la tabla. UnaDROP ROWexpectativa hace que se quite toda la fila si no se cumple la expectativa.expectation_exprpuede estar compuesto por literales, identificadores de columna dentro de la tabla y funciones u operadores SQL integrados y deterministas, a excepción de lo siguiente:-
Funciones de agregado
- Funciones de ventana analítica
- Funciones de ventana de clasificación
- Funciones generadoras de valores de tabla
Además,
exprno deben contener ninguna subconsulta .-
Funciones de agregado
-
Importante
Esta característica está en versión preliminar pública.
Agrega una clave principal informativa o restricciones de clave externa informativa a una tabla de streaming. No se admiten restricciones de clave para las tablas del catálogo
hive_metastore.
-
-
cláusulas_tabla
Opcionalmente, especifique la creación de particiones, los comentarios, las propiedades definidas por el usuario y una programación de actualización para la nueva tabla. Cada subcláusula solo se puede especificar una vez.
-
Lista opcional de columnas de la tabla por la que se va a particionar la tabla.
Nota:
La agrupación en clústeres líquidos proporciona una solución flexible y optimizada para la agrupación en clústeres. Considere la posibilidad de usar
CLUSTER BYen lugar dePARTITIONED BYpara las tablas de streaming. -
Una cláusula opcional para agrupar por un subconjunto de columnas. Use la agrupación automática en clústeres líquidos con
CLUSTER BY AUTOy Databricks elige de forma inteligente las claves de agrupación en clústeres para optimizar el rendimiento de las consultas. Consulte Uso de clústeres líquidos para tablas.La agrupación en clústeres líquidos no se puede combinar con
PARTITIONED BY. COMENTARIO table_comment
Una
STRINGliteral para describir la tabla.INTERCALACIÓN PREDETERMINADA UTF8_BINARY
Se aplica a:
Databricks SQL
Databricks Runtime 17.1 y versiones posterioresFuerza la intercalación predeterminada de la tabla de streaming a
UTF8_BINARY. Esta cláusula es obligatoria si el esquema en el que se crea la tabla tiene una intercalación predeterminada distinta deUTF8_BINARY. La intercalación predeterminada de la tabla de flujo se utiliza como intercalación predeterminada dentro delqueryy para los tipos de columna.-
Este parámetro opcional le permite establecer una o más propiedades que defina el usuario.
Usa esta configuración para especificar el canal de tiempo de ejecución para las canalizaciones declarativas de Spark de Lakeflow que se usa para ejecutar esta instrucción. Establezca el valor de la
pipelines.channelpropiedad en"PREVIEW"o"CURRENT". El valor predeterminado es"CURRENT". Para más información sobre los canales en tiempo de ejecución de las canalizaciones declarativas de Spark de Lakeflow, consulte Canales de tiempo de ejecución de las canalizaciones declarativas de Spark de Lakeflow. horario
La programación puede ser una
SCHEDULEinstrucción o unaTRIGGERinstrucción .SCHEDULE [ REFRESH ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }Para programar una actualización que se produce periódicamente, use la sintaxis
EVERY. Si se especifica la sintaxisEVERY, la tabla de streaming o la vista materializada se actualiza periódicamente en el intervalo especificado según el valor proporcionado, comoHOUR,HOURS,DAY,DAYS,WEEKoWEEKS. En la tabla siguiente se enumeran los valores enteros aceptados paranumber.Unidad de tiempo Valor entero HOUR or HOURS1 <= H <= 72 DAY or DAYS1 <= D <= 31 WEEK or WEEKS1 <= W <= 8 Nota:
Las formas singulares y plurales de la unidad de tiempo incluida son semánticamente equivalentes.
CRON cron_string [ AT TIME ZONE timezone_id ]Para programar una actualización mediante un valor cron de cuarzo. Se aceptan valores time_zone_values. No se admite
AT TIME ZONE LOCAL.Si
AT TIME ZONEno está presente, se usa la zona horaria de la sesión. SiAT TIME ZONEno está presente y no se establece la zona horaria de la sesión, se produce un error.SCHEDULEes equivalente semánticamente aSCHEDULE REFRESH.
La programación se puede proporcionar como parte del
CREATEcomando. Use ALTER STREAMING TABLE o ejecute el comandoCREATE OR REFRESHcon la cláusulaSCHEDULEpara modificar la programación de una tabla de streaming después de la creación.UPDATE
Importante
La
TRIGGER ON UPDATEcaracterística está en beta.Opcionalmente, establezca la tabla que se actualizará cuando se actualice un origen de datos ascendente, como máximo una vez cada minuto. Establezca un valor para para
AT MOST EVERYque requiera al menos un tiempo mínimo entre las actualizaciones.Los orígenes de datos ascendentes deben ser tablas Delta externas o administradas (incluidas las vistas materializadas o las tablas de streaming) o vistas administradas cuyas dependencias están limitadas a los tipos de tabla admitidos.
La habilitación de eventos de archivo puede hacer que los desencadenadores sean más eficaces y aumenta algunos de los límites de las actualizaciones del desencadenador.
trigger_intervales una instrucción INTERVAL que es al menos 1 minuto.TRIGGER ON UPDATEtiene las siguientes limitaciones- No más de 10 orígenes de datos ascendentes por tabla de streaming cuando se usa TRIGGER ON UPDATE.
- Se puede especificar un máximo de 1000 tablas de streaming o vistas materializadas con TRIGGER ON UPDATE.
- La
AT MOST EVERYcláusula tiene como valor predeterminado 1 minuto y no puede ser inferior a 1 minuto.
-
con cláusula ROW FILTER
Agrega una función de filtro de fila a la tabla. Todas las consultas posteriores de esa tabla reciben un subconjunto de las filas donde la función se evalúa como TRUE booleano. Esto puede ser útil para fines de control de acceso específicos en los que la función puede inspeccionar la identidad o las pertenencias a grupos del usuario que realiza la invocación para decidir si se filtran determinadas filas.
AS consulta
Esta cláusula rellena la tabla con los datos de
query. Esta consulta debe ser una consulta de streaming. Esto se puede lograr agregando laSTREAMpalabra clave a cualquier relación que desee procesar de forma incremental. Al especificar unqueryelemento y un elementotable_specificationjuntos, el esquema de tabla especificado entable_specificationdebe contener todas las columnas devueltas porquery, de lo contrario, obtendrá un error. Cualquier columna especificada entable_specificationpero no devuelta porquerydevuelve valoresnullcuando se consulta.
Diferencias entre tablas de streaming y otras tablas
Las tablas de streaming son tablas con estado, diseñadas para controlar cada fila solo una vez a medida que se procesa un conjunto de datos creciente. Dado que la mayoría de los conjuntos de datos crecen continuamente a lo largo del tiempo, las tablas de streaming son adecuadas para la mayoría de las cargas de trabajo de procesamiento. Las tablas de streaming son óptimas para las canalizaciones que requieren actualización de datos y baja latencia. Las tablas de streaming también pueden ser útiles para las transformaciones de escala masiva, ya que los resultados se pueden calcular incrementalmente a medida que llegan nuevos datos, manteniendo los resultados actualizados sin necesidad de volver a calcular completamente todos los datos de origen con cada actualización. Las tablas de streaming están diseñadas para orígenes de datos que son de solo anexión.
Las tablas de streaming aceptan comandos adicionales, como REFRESH, que procesa los datos más recientes disponibles en los orígenes proporcionados en la consulta. Los cambios en la consulta proporcionada solo se reflejan en los nuevos datos al realizar una llamada a REFRESH, no en los datos procesados previamente. Para aplicar también los cambios en los datos existentes, debe ejecutar REFRESH TABLE <table_name> FULL para realizar una FULL REFRESH. Las actualizaciones completas vuelven a procesar todos los datos disponibles en el origen con la definición más reciente. No se recomienda llamar a actualizaciones completas en orígenes que no conserven todo el historial de los datos ni tengan períodos de retención cortos, como Kafka, ya que la actualización completa trunca los datos existentes. Es posible que no pueda recuperar datos antiguos si los datos ya no están disponibles en el origen.
Filtros de fila y máscaras de columna
Filtros de fila permiten especificar una función que se aplica como filtro cada vez que un recorrido de tabla captura filas. Estos filtros garantizan que las consultas posteriores solo devuelven filas para las que el predicado de filtro se evalúa como true.
Las máscaras de columna permiten enmascarar los valores de una columna cada vez que un examen de tabla captura filas. Todas las consultas futuras que implican esa columna recibirán el resultado de evaluar la función sobre la columna, reemplazando el valor original de la columna.
Para obtener más información sobre cómo usar filtros de fila y máscaras de columna, consulte Filtros de fila y máscaras de columna.
Administración de filtros de fila y máscaras de columna
Los filtros de fila y las máscaras de columna de las tablas de streaming deben agregarse, actualizarse o quitarse a través de la instrucción CREATE OR REFRESH.
Comportamiento
-
Actualizar como definidor: cuando las
CREATE OR REFRESHinstrucciones oREFRESHactualizan una tabla de streaming, las funciones de filtro de fila se ejecutan con los derechos del definidor (como propietario de la tabla). Esto significa que la actualización de la tabla usa el contexto de seguridad del usuario que creó la tabla de streaming. -
Consulta: aunque la mayoría de los filtros se ejecutan con los derechos del definidor, las funciones que comprueban el contexto del usuario (como
CURRENT_USERyIS_MEMBER) son excepciones. Estas funciones se ejecutan como invocador. Este enfoque aplica controles de acceso y seguridad de datos específicos del usuario en función del contexto del usuario actual.
Observabilidad
Use DESCRIBE EXTENDED, INFORMATION_SCHEMAo el Explorador de catálogos para examinar los filtros de fila y las máscaras de columna existentes que se aplican a una tabla de streaming determinada. Esta funcionalidad permite a los usuarios auditar y revisar las medidas de acceso y protección de datos en tablas de streaming.
Limitaciones
- Solo los propietarios de tablas pueden actualizar las tablas de streaming para obtener los datos más recientes.
-
ALTER TABLELos comandos no se permiten en las tablas de streaming. La definición y las propiedades de la tabla se deben modificar a través de la instrucciónCREATE OR REFRESHo ALTER STREAMING TABLE. - No se admite la evolución del esquema de tabla a través de comandos DML como
INSERT INTO, yMERGE. - Los comandos siguientes no se admiten en tablas de streaming:
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
- No se admite el uso compartido delta.
- No se admite cambiar el nombre de la tabla ni cambiar el propietario.
- No se admiten restricciones de tabla como
PRIMARY KEYyFOREIGN KEYpara tablas de streaming en elhive_metastorecatálogo. - No se admiten columnas generadas, columnas de identidad y columnas predeterminadas.
Ejemplos
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
AS SELECT *
FROM STREAM source_stream_data;
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM STREAM sales;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')