Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
O conector do Microsoft SQL Server está em Visualização Pública.
Esta página descreve como ingerir dados do SQL Server e carregá-los no Azure Databricks usando o Lakeflow Connect. O conector do SQL Server dá suporte aos bancos de dados SQL do Azure SQL e do Amazon RDS SQL. Isso inclui o SQL Server em execução em VMs (máquinas virtuais) do Azure e no Amazon EC2. O conector também dá suporte ao SQL Server local usando a rede do Azure ExpressRoute e do AWS Direct Connect.
Visão geral das etapas
- Configure o banco de dados de origem para ingestão.
- Crie um gateway de ingestão. O gateway se conecta ao banco de dados do SQL Server, extrai dados instantâneos e de modificações e os armazena em um volume do Catálogo Unity para preparação.
- Crie uma pipeline de ingestão. O pipeline aplica dados instantâneos e dados de alteração do volume de estágio nas tabelas de streaming de destino.
- Agende o pipeline de ingestão.
Antes de começar
Para criar um pipeline de ingestão, você deve atender aos seguintes requisitos:
Você tem acesso a uma instância primária do SQL Server. Não há suporte para recursos de controle de alterações e de captura de dados de alterações em réplicas de leitura ou instâncias secundárias.
Seu espaço de trabalho está habilitado para o Unity Catalog.
A computação sem servidor está habilitada para notebooks, fluxos de trabalho e Pipelines Declarativos do Lakeflow. Confira Habilitar a computação sem servidor.
Para criar uma nova conexão:
CREATE CONNECTION
no metastore.Para usar uma conexão existente:
USE CONNECTION
ouALL PRIVILEGES
na conexão.USE CATALOG
no catálogo de destino.USE SCHEMA
,CREATE TABLE
eCREATE VOLUME
em um esquema existente ouCREATE SCHEMA
no catálogo de destino.Permissões irrestritas para criar clusters ou uma política personalizada. Uma política personalizada deve atender aos seguintes requisitos:
Família: Computação de Trabalho
Substituições da família de políticas:
{ "cluster_type": { "type": "fixed", "value": "dlt" }, "num_workers": { "type": "unlimited", "defaultValue": 1, "isOptional": true }, "runtime_engine": { "type": "fixed", "value": "STANDARD", "hidden": true } }
O Databricks recomenda especificar os menores nós de trabalho possíveis para gateways de ingestão porque eles não afetam o desempenho do gateway.
"driver_node_type_id": { "type": "unlimited", "defaultValue": "r5.xlarge", "isOptional": true }, "node_type_id": { "type": "unlimited", "defaultValue": "m4.large", "isOptional": true }
Para obter mais informações sobre políticas de cluster, consulte Selecionar uma política de cluster.
Configurar o banco de dados de origem para ingestão
Consulte Configurar o Microsoft SQL Server para ingestão no Azure Databricks.
Criar uma conexão do SQL Server
- No workspace do Azure Databricks, clique em Catálogo > Dados Externos > Conexões.
- Clique em Criar conexão. Se você não vir esse botão, não terá
CREATE CONNECTION
privilégios. - Insira um Nome de conexão exclusivo.
- Para Tipo de conexão, selecione SQL Server.
- Para Host, especifique o nome de domínio do SQL Server.
- Para Usuário e Senha, insira suas credenciais de logon do SQL Server.
- Clique em Criar.
Criar um catálogo e esquemas de preparo
O catálogo e o esquema de preparo podem ser os mesmos que o catálogo e o esquema de destino. O catálogo de estágio não pode ser um catálogo estrangeiro.
Gerenciador de Catálogos
- No workspace do Azure Databricks, clique em Catálogo.
- Na guia Catálogo, siga um destes procedimentos:
- Clique em Criar catálogo. Se você não vir esse botão, não terá
CREATE CATALOG
privilégios. - Insira um nome exclusivo para o catálogo e clique em Criar.
- Selecione o catálogo que você criou.
- Clique em Criar esquema. Se você não vir esse botão, não terá
CREATE SCHEMA
privilégios. - Insira um nome exclusivo para o esquema e clique em Criar.
Interface de Linha de Comando (CLI)
export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
Criar o pipeline de gateway e ingestão
O gateway de ingestão extrai dados de instantâneo e de alteração do banco de dados de origem e os armazena no volume de preparação do Catálogo do Unity. É necessário operacionalizar o gateway como um fluxo contínuo. Isso ajuda a acomodar todas as políticas de retenção de log de alterações que você tem no banco de dados de origem.
O pipeline de ingestão aplica os dados de alteração e instantâneo do volume de preparo nas tabelas de streaming de destino.
Observação
Cada pipeline de ingestão deve ser associado a exatamente um gateway de ingestão.
O pipeline de ingestão não dá suporte a mais de um catálogo e esquema de destino. Se você precisar escrever em vários catálogos ou esquemas de destino, crie vários pares de gateway-pipeline.
Pacotes de ativos do Databricks
Esta guia descreve como implantar um pipeline de ingestão usando pacotes de ativos do Databricks. Os pacotes podem conter definições YAML de trabalhos e tarefas, são gerenciados usando a CLI do Databricks e podem ser compartilhados e executados em diferentes workspaces de destino (como desenvolvimento, preparo e produção). Para obter mais informações, confira Pacotes de Ativos do Databricks.
Crie um novo pacote usando a CLI do Databricks:
databricks bundle init
Adicione dois novos arquivos de recurso ao pacote:
- Um arquivo de definição de pipeline (
resources/sqlserver_pipeline.yml
). - Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (
resources/sqlserver.yml
).
A seguir, é mostrado um arquivo
resources/sqlserver_pipeline.yml
de exemplo:variables: # Common variables used multiple places in the DAB definition. gateway_name: default: sqlserver-gateway dest_catalog: default: main dest_schema: default: ingest-destination-schema resources: pipelines: gateway: name: ${var.gateway_name} gateway_definition: connection_name: <sqlserver-connection> gateway_storage_catalog: main gateway_storage_schema: ${var.dest_schema} gateway_storage_name: ${var.gateway_name} target: ${var.dest_schema} catalog: ${var.dest_catalog} channel: PREVIEW pipeline_sqlserver: name: sqlserver-ingestion-pipeline ingestion_definition: ingestion_gateway_id: ${resources.pipelines.gateway.id} objects: # Modify this with your tables! - table: # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item. source_catalog: test source_schema: ingestion_demo source_table: lineitem destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - schema: # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination # table name will be the same as it is on the source. source_catalog: test source_schema: ingestion_whole_schema destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} target: ${var.dest_schema} catalog: ${var.dest_catalog} channel: PREVIEW
A seguir, é mostrado um arquivo
resources/sqlserver_job.yml
de exemplo:resources: jobs: sqlserver_dab_job: name: sqlserver_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
- Um arquivo de definição de pipeline (
Implante o pipeline usando a CLI do Databricks:
databricks bundle deploy
Caderno
Atualize a célula Configuration
no notebook a seguir com a conexão de origem, catálogo de destino, esquema de destino e tabelas a serem ingeridas da origem.
Criar gateway e pipeline de ingestão
Interface de Linha de Comando (CLI)
Para criar o gateway:
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
Para criar o pipeline de ingestão:
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_catalog": "tpc",
"source_schema": "tpch",
"source_table": "lineitem",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
}},
{"schema": {
"source_catalog": "tpc",
"source_schema": "tpcdi",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'
Inicie, agende e defina alertas no seu pipeline
Você pode criar um agendamento para o pipeline na página de detalhes do pipeline.
Depois que o pipeline tiver sido criado, volte para o workspace do Azure Databricks e clique em Pipelines.
O novo pipeline é exibido na lista de pipelines.
Para exibir os detalhes do pipeline, clique no nome do pipeline.
Na página de detalhes do pipeline, você pode agendar o pipeline clicando em Agendar.
Para definir notificações no pipeline, clique em Configuraçõese, em seguida, adicione uma notificação.
Para cada cronograma que você adiciona a um pipeline, o Lakeflow Connect cria automaticamente uma tarefa para ele. O pipeline de ingestão é uma tarefa dentro do trabalho. Opcionalmente, você pode adicionar mais tarefas ao trabalho.
Verificar a ingestão de dados bem-sucedida
A exibição de lista na página de detalhes do pipeline mostra o número de registros processados à medida que os dados são carregados. Esses números são atualizados automaticamente.
As colunas Upserted records
e Deleted records
não são mostradas por padrão. Você pode habilitá-las clicando no botão de configuração de colunas e selecionando-as.