Compartilhar via


Ingerir dados do SQL Server

Importante

O conector do Microsoft SQL Server está em Visualização Pública.

Esta página descreve como ingerir dados do SQL Server e carregá-los no Azure Databricks usando o Lakeflow Connect. O conector do SQL Server dá suporte aos bancos de dados SQL do Azure SQL e do Amazon RDS SQL. Isso inclui o SQL Server em execução em VMs (máquinas virtuais) do Azure e no Amazon EC2. O conector também dá suporte ao SQL Server local usando a rede do Azure ExpressRoute e do AWS Direct Connect.

Visão geral das etapas

  1. Configure o banco de dados de origem para ingestão.
  2. Crie um gateway de ingestão. O gateway se conecta ao banco de dados do SQL Server, extrai dados instantâneos e de modificações e os armazena em um volume do Catálogo Unity para preparação.
  3. Crie uma pipeline de ingestão. O pipeline aplica dados instantâneos e dados de alteração do volume de estágio nas tabelas de streaming de destino.
  4. Agende o pipeline de ingestão.

Antes de começar

Para criar um pipeline de ingestão, você deve atender aos seguintes requisitos:

  • Você tem acesso a uma instância primária do SQL Server. Não há suporte para recursos de controle de alterações e de captura de dados de alterações em réplicas de leitura ou instâncias secundárias.

  • Seu espaço de trabalho está habilitado para o Unity Catalog.

  • A computação sem servidor está habilitada para notebooks, fluxos de trabalho e Pipelines Declarativos do Lakeflow. Confira Habilitar a computação sem servidor.

  • Para criar uma nova conexão: CREATE CONNECTION no metastore.

    Para usar uma conexão existente: USE CONNECTION ou ALL PRIVILEGES na conexão.

  • USE CATALOG no catálogo de destino.

  • USE SCHEMA, CREATE TABLE e CREATE VOLUME em um esquema existente ou CREATE SCHEMA no catálogo de destino.

  • Permissões irrestritas para criar clusters ou uma política personalizada. Uma política personalizada deve atender aos seguintes requisitos:

    • Família: Computação de Trabalho

    • Substituições da família de políticas:

      {
        "cluster_type": {
          "type": "fixed",
          "value": "dlt"
        },
        "num_workers": {
          "type": "unlimited",
          "defaultValue": 1,
          "isOptional": true
        },
        "runtime_engine": {
          "type": "fixed",
          "value": "STANDARD",
          "hidden": true
        }
      }
      
    • O Databricks recomenda especificar os menores nós de trabalho possíveis para gateways de ingestão porque eles não afetam o desempenho do gateway.

      "driver_node_type_id": {
        "type": "unlimited",
        "defaultValue": "r5.xlarge",
        "isOptional": true
      },
      "node_type_id": {
        "type": "unlimited",
        "defaultValue": "m4.large",
        "isOptional": true
      }
      

    Para obter mais informações sobre políticas de cluster, consulte Selecionar uma política de cluster.

Configurar o banco de dados de origem para ingestão

Consulte Configurar o Microsoft SQL Server para ingestão no Azure Databricks.

Criar uma conexão do SQL Server

  1. No workspace do Azure Databricks, clique em Catálogo > Dados Externos > Conexões.
  2. Clique em Criar conexão. Se você não vir esse botão, não terá CREATE CONNECTION privilégios.
  3. Insira um Nome de conexão exclusivo.
  4. Para Tipo de conexão, selecione SQL Server.
  5. Para Host, especifique o nome de domínio do SQL Server.
  6. Para Usuário e Senha, insira suas credenciais de logon do SQL Server.
  7. Clique em Criar.

Criar um catálogo e esquemas de preparo

O catálogo e o esquema de preparo podem ser os mesmos que o catálogo e o esquema de destino. O catálogo de estágio não pode ser um catálogo estrangeiro.

Gerenciador de Catálogos

  1. No workspace do Azure Databricks, clique em Catálogo.
  2. Na guia Catálogo, siga um destes procedimentos:
  3. Clique em Criar catálogo. Se você não vir esse botão, não terá CREATE CATALOG privilégios.
  4. Insira um nome exclusivo para o catálogo e clique em Criar.
  5. Selecione o catálogo que você criou.
  6. Clique em Criar esquema. Se você não vir esse botão, não terá CREATE SCHEMA privilégios.
  7. Insira um nome exclusivo para o esquema e clique em Criar.

Interface de Linha de Comando (CLI)

export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."

output=$(databricks connections create --json '{
  "name": "'"$CONNECTION_NAME"'",
  "connection_type": "SQLSERVER",
  "options": {
    "host": "'"$DB_HOST"'",
    "port": "1433",
    "trustServerCertificate": "false",
    "user": "'"$DB_USER"'",
    "password": "'"$DB_PASSWORD"'"
  }
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

Criar o pipeline de gateway e ingestão

O gateway de ingestão extrai dados de instantâneo e de alteração do banco de dados de origem e os armazena no volume de preparação do Catálogo do Unity. É necessário operacionalizar o gateway como um fluxo contínuo. Isso ajuda a acomodar todas as políticas de retenção de log de alterações que você tem no banco de dados de origem.

O pipeline de ingestão aplica os dados de alteração e instantâneo do volume de preparo nas tabelas de streaming de destino.

Observação

Cada pipeline de ingestão deve ser associado a exatamente um gateway de ingestão.

O pipeline de ingestão não dá suporte a mais de um catálogo e esquema de destino. Se você precisar escrever em vários catálogos ou esquemas de destino, crie vários pares de gateway-pipeline.

Pacotes de ativos do Databricks

Esta guia descreve como implantar um pipeline de ingestão usando pacotes de ativos do Databricks. Os pacotes podem conter definições YAML de trabalhos e tarefas, são gerenciados usando a CLI do Databricks e podem ser compartilhados e executados em diferentes workspaces de destino (como desenvolvimento, preparo e produção). Para obter mais informações, confira Pacotes de Ativos do Databricks.

  1. Crie um novo pacote usando a CLI do Databricks:

    databricks bundle init
    
  2. Adicione dois novos arquivos de recurso ao pacote:

    • Um arquivo de definição de pipeline (resources/sqlserver_pipeline.yml).
    • Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (resources/sqlserver.yml).

    A seguir, é mostrado um arquivo resources/sqlserver_pipeline.yml de exemplo:

    variables:
      # Common variables used multiple places in the DAB definition.
      gateway_name:
        default: sqlserver-gateway
      dest_catalog:
        default: main
      dest_schema:
        default: ingest-destination-schema
    
    resources:
      pipelines:
        gateway:
          name: ${var.gateway_name}
          gateway_definition:
            connection_name: <sqlserver-connection>
            gateway_storage_catalog: main
            gateway_storage_schema: ${var.dest_schema}
            gateway_storage_name: ${var.gateway_name}
          target: ${var.dest_schema}
          catalog: ${var.dest_catalog}
          channel: PREVIEW
    
        pipeline_sqlserver:
          name: sqlserver-ingestion-pipeline
          ingestion_definition:
            ingestion_gateway_id: ${resources.pipelines.gateway.id}
            objects:
              # Modify this with your tables!
              - table:
                  # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
                  source_catalog: test
                  source_schema: ingestion_demo
                  source_table: lineitem
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - schema:
                  # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
                  # table name will be the same as it is on the source.
                  source_catalog: test
                  source_schema: ingestion_whole_schema
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          target: ${var.dest_schema}
          catalog: ${var.dest_catalog}
          channel: PREVIEW
    

    A seguir, é mostrado um arquivo resources/sqlserver_job.yml de exemplo:

    resources:
      jobs:
        sqlserver_dab_job:
          name: sqlserver_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
    
  3. Implante o pipeline usando a CLI do Databricks:

    databricks bundle deploy
    

Caderno

Atualize a célula Configuration no notebook a seguir com a conexão de origem, catálogo de destino, esquema de destino e tabelas a serem ingeridas da origem.

Criar gateway e pipeline de ingestão

Obter laptop

Interface de Linha de Comando (CLI)

Para criar o gateway:

output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
  "connection_id": "'"$CONNECTION_ID"'",
  "gateway_storage_catalog": "'"$STAGING_CATALOG"'",
  "gateway_storage_schema": "'"$STAGING_SCHEMA"'",
  "gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
  }
}')

export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')

Para criar o pipeline de ingestão:

databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
  "ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
  "objects": [
    {"table": {
        "source_catalog": "tpc",
        "source_schema": "tpch",
        "source_table": "lineitem",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'",
        "destination_table": "<YOUR_DATABRICKS_TABLE>",
        }},
     {"schema": {
        "source_catalog": "tpc",
        "source_schema": "tpcdi",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'"
        }}
    ]
  }
}'

Inicie, agende e defina alertas no seu pipeline

Você pode criar um agendamento para o pipeline na página de detalhes do pipeline.

  1. Depois que o pipeline tiver sido criado, volte para o workspace do Azure Databricks e clique em Pipelines.

    O novo pipeline é exibido na lista de pipelines.

  2. Para exibir os detalhes do pipeline, clique no nome do pipeline.

  3. Na página de detalhes do pipeline, você pode agendar o pipeline clicando em Agendar.

  4. Para definir notificações no pipeline, clique em Configuraçõese, em seguida, adicione uma notificação.

Para cada cronograma que você adiciona a um pipeline, o Lakeflow Connect cria automaticamente uma tarefa para ele. O pipeline de ingestão é uma tarefa dentro do trabalho. Opcionalmente, você pode adicionar mais tarefas ao trabalho.

Verificar a ingestão de dados bem-sucedida

A exibição de lista na página de detalhes do pipeline mostra o número de registros processados à medida que os dados são carregados. Esses números são atualizados automaticamente.

Verificar a replicação

As colunas Upserted records e Deleted records não são mostradas por padrão. Você pode habilitá-las clicando no botão de configuração de colunas Ícone de configuração de colunas e selecionando-as.

Recursos adicionais