Udostępnij za pośrednictwem


Pozyskiwanie danych z programu SQL Server

Ważne

Łącznik programu Microsoft SQL Server jest w publicznej wersji zapoznawczej.

Na tej stronie opisano sposób pozyskiwania danych z programu SQL Server i ładowania ich do usługi Azure Databricks przy użyciu usługi Lakeflow Connect. Łącznik programu SQL Server obsługuje bazy danych SQL Azure SQL i Amazon RDS SQL. Obejmuje to program SQL Server uruchomiony na maszynach wirtualnych platformy Azure i w usłudze Amazon EC2. Łącznik obsługuje również SQL Server w lokalnym środowisku, korzystając z sieci Azure ExpressRoute i AWS Direct Connect.

Zanim rozpoczniesz

Aby utworzyć kanał danych wejściowych, musisz spełnić następujące wymagania:

  • Twoje środowisko pracy ma włączoną funkcję Unity Catalog.

  • W twoim obszarze roboczym włączono bezserwerowe obliczenia. Zobacz Włączanie przetwarzania bezserwerowego.

  • Jeśli planujesz utworzyć połączenie: masz uprawnienia CREATE CONNECTION w metastore.

    Jeśli łącznik obsługuje tworzenie potoków opartych na interfejsie użytkownika, możesz utworzyć połączenie i potok w tym samym czasie, wykonując kroki na tej stronie. Jeśli jednak używasz tworzenia potoku za pomocą interfejsu API, przed wykonaniem kroków na tej stronie należy utworzyć połączenie w Eksploratorze Katalogu. Zobacz Połączenie z zarządzanymi źródłami pozyskiwania danych.

  • Jeśli planujesz użyć istniejącego połączenia, musisz mieć USE CONNECTION uprawnienia lub ALL PRIVILEGES do połączenia.

  • Masz uprawnienia USE CATALOG w katalogu docelowym.

  • Masz uprawnienia USE SCHEMA, CREATE TABLE, i CREATE VOLUME w odniesieniu do istniejącego schematu lub masz uprawnienia CREATE SCHEMA w katalogu docelowym.

  • Masz dostęp do podstawowego wystąpienia programu SQL Server. Funkcje śledzenia zmian i przechwytywania danych o zmianach nie są obsługiwane w replikach do odczytu ani w wystąpieniach pomocniczych.

  • Nieograniczone uprawnienia do tworzenia klastrów lub zasad niestandardowych. Polityka niestandardowa musi spełniać następujące wymagania:

    • Rodzina: Obliczenia zadań

    • Przesłonięcia zestawów polityk:

      {
        "cluster_type": {
          "type": "fixed",
          "value": "dlt"
        },
        "num_workers": {
          "type": "unlimited",
          "defaultValue": 1,
          "isOptional": true
        },
        "runtime_engine": {
          "type": "fixed",
          "value": "STANDARD",
          "hidden": true
        }
      }
      
    • Usługa Databricks zaleca określenie najmniejszych możliwych węzłów roboczych dla bram zarządzania danymi, ze względu na to, że nie mają wpływu na wydajność bramy.

      "driver_node_type_id": {
        "type": "unlimited",
        "defaultValue": "r5.xlarge",
        "isOptional": true
      },
      "node_type_id": {
        "type": "unlimited",
        "defaultValue": "m4.large",
        "isOptional": true
      }
      

    Aby uzyskać więcej informacji na temat zasad klastra, zobacz Wybieranie zasad klastra.

Aby przetwarzać dane z SQL Server, należy również ukończyć konfigurację źródłową.

Opcja 1. Interfejs użytkownika usługi Azure Databricks

Użytkownicy administracyjni mogą utworzyć połączenie i potok w tym samym czasie w interfejsie użytkownika. Jest to najprostszy sposób zaprojektowania zarządzanych potoków wprowadzania danych.

  1. Na pasku bocznym obszaru roboczego usługi Azure Databricks kliknij pozycję Pozyskiwanie danych.

  2. Na stronie Dodawanie danych w obszarze Łączniki usługi Databricks kliknij pozycję SQL Server.

    Otwiera się kreator importowania.

  3. Na stronie kreatora Brama wprowadzania wprowadź unikatową nazwę bramy.

  4. Wybierz wykaz i schemat dla danych pozyskiwania przejściowego, a następnie kliknij przycisk Dalej.

  5. Na stronie Potok pozyskiwania wprowadź unikatową nazwę potoku.

  6. W obszarze Katalog docelowy wybierz katalog do przechowywania pozyskanych danych.

  7. Wybierz połączenie Unity Catalog, które przechowuje poświadczenia wymagane do uzyskania dostępu do danych źródłowych.

    Jeśli nie ma istniejących połączeń ze źródłem, kliknij przycisk Utwórz połączenie i wprowadź szczegóły uwierzytelniania uzyskane z konfiguracji źródłowej. Musisz mieć uprawnienia CREATE CONNECTION w metastore.

  8. Kliknij Utwórz kanał i kontynuuj.

  9. Na stronie Źródło wybierz tabele do importowania.

  10. Opcjonalnie zmień domyślne ustawienie śledzenia historii. Aby uzyskać więcej informacji, zobacz Śledzenie historii.

  11. Kliknij przycisk Dalej.

  12. Na stronie docelowej wybierz katalog Unity Catalog i schemat, do którego chcesz zapisać.

    Jeśli nie chcesz używać istniejącego schematu, kliknij przycisk Utwórz schemat. Musisz mieć uprawnienia USE CATALOG i CREATE SCHEMA w katalogu nadrzędnym.

  13. Kliknij przycisk Zapisz i kontynuuj.

  14. (Opcjonalnie) Na stronie Ustawienia kliknij pozycję Utwórz harmonogram. Ustaw częstotliwość odświeżania tabel docelowych.

  15. (Opcjonalnie) Ustaw powiadomienia e-mail dotyczące powodzenia lub niepowodzenia działania potoku.

  16. Kliknij Zapisz i uruchom potok.

Opcja 2. Inne interfejsy

Przed importowaniem za pomocą Databricks Asset Bundles, interfejsów API, zestawów SDK lub interfejsu wiersza polecenia Databricks, musisz mieć dostęp do istniejącego połączenia Unity Catalog. Aby uzyskać instrukcje, zobacz Połącz się z zarządzanymi źródłami pozyskiwania.

Utwórz katalog przejściowy i schemat

Katalog i schemat tymczasowy mogą być takie same jak katalog i schemat docelowy. Katalog przejściowy nie może być katalogiem obcym.

CLI

export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."

output=$(databricks connections create --json '{
  "name": "'"$CONNECTION_NAME"'",
  "connection_type": "SQLSERVER",
  "options": {
    "host": "'"$DB_HOST"'",
    "port": "1433",
    "trustServerCertificate": "false",
    "user": "'"$DB_USER"'",
    "password": "'"$DB_PASSWORD"'"
  }
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

Tworzenie bramy dostępu i potoku danych

Brama pobierania wyodrębnia dane migawkowe i zmiany ze źródłowej bazy danych i przechowuje je w woluminie przejściowym Unity Catalog. Bramę należy uruchomić jako ciągły proces. Pomaga to uwzględnić wszelkie zasady przechowywania dzienników zmian, które masz w źródłowej bazie danych.

Potok pozyskiwania stosuje migawkę i zmienia dane z woluminu przejściowego na docelowe tabele przesyłania strumieniowego.

Uwaga / Notatka

Każdy potok pozyskiwania musi być skojarzony z dokładnie jedną bramą pozyskiwania.

Potok przetwarzania danych nie obsługuje więcej niż jednego katalogu docelowego ani schematu. Jeśli musisz zapisać w wielu katalogach docelowych lub schematach, utwórz wiele par brama-potok.

Pakiety zasobów Databricks

Na tej karcie opisano sposób wdrażania potoku przetwarzania danych przy użyciu Databricks Asset Bundles. Pakiety mogą zawierać definicje YAML dotyczące prac i zadań, są zarządzane przy użyciu interfejsu wiersza polecenia usługi Databricks i mogą być udostępniane oraz uruchamiane w różnych docelowych obszarach roboczych (takich jak rozwój, środowisko testowe i produkcja). Aby uzyskać więcej informacji, zobacz Pakiety zasobów Databricks.

  1. Utwórz nowy pakiet przy użyciu interfejsu wiersza polecenia usługi Databricks:

    databricks bundle init
    
  2. Dodaj dwa nowe pliki zasobów do pakietu:

    • Plik definicji potoku danych (resources/sqlserver_pipeline.yml).
    • Plik przepływu pracy, który kontroluje częstotliwość pozyskiwania danych (resources/sqlserver.yml).

    Oto przykładowy resources/sqlserver_pipeline.yml plik:

    variables:
      # Common variables used multiple places in the DAB definition.
      gateway_name:
        default: sqlserver-gateway
      dest_catalog:
        default: main
      dest_schema:
        default: ingest-destination-schema
    
    resources:
      pipelines:
        gateway:
          name: ${var.gateway_name}
          gateway_definition:
            connection_name: <sqlserver-connection>
            gateway_storage_catalog: main
            gateway_storage_schema: ${var.dest_schema}
            gateway_storage_name: ${var.gateway_name}
          target: ${var.dest_schema}
          catalog: ${var.dest_catalog}
          channel: PREVIEW
    
        pipeline_sqlserver:
          name: sqlserver-ingestion-pipeline
          ingestion_definition:
            ingestion_gateway_id: ${resources.pipelines.gateway.id}
            objects:
              # Modify this with your tables!
              - table:
                  # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
                  source_catalog: test
                  source_schema: ingestion_demo
                  source_table: lineitem
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - schema:
                  # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
                  # table name will be the same as it is on the source.
                  source_catalog: test
                  source_schema: ingestion_whole_schema
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          target: ${var.dest_schema}
          catalog: ${var.dest_catalog}
          channel: PREVIEW
    

    Oto przykładowy resources/sqlserver_job.yml plik:

    resources:
      jobs:
        sqlserver_dab_job:
          name: sqlserver_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
    
  3. Wdróż pipeline za pomocą CLI Databricks.

    databricks bundle deploy
    

Notatnik

Zaktualizuj komórkę Configuration w poniższym notesie, używając połączenia źródłowego, oraz katalogu docelowego, schematu docelowego i tabel do pobrania ze źródła.

Tworzenie bramy i potoku pobierania danych

Pobierz notes

CLI

Aby utworzyć bramę:

output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
  "connection_id": "'"$CONNECTION_ID"'",
  "gateway_storage_catalog": "'"$STAGING_CATALOG"'",
  "gateway_storage_schema": "'"$STAGING_SCHEMA"'",
  "gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
  }
}')

export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')

Aby utworzyć pipeline importu danych:

databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
  "ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
  "objects": [
    {"table": {
        "source_catalog": "tpc",
        "source_schema": "tpch",
        "source_table": "lineitem",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'",
        "destination_table": "<YOUR_DATABRICKS_TABLE>",
        }},
     {"schema": {
        "source_catalog": "tpc",
        "source_schema": "tpcdi",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'"
        }}
    ]
  }
}'

Rozpocznij, zaplanuj i ustaw alerty w swoim rurociągu

Harmonogram przepływu można utworzyć na stronie szczegółów przepływu.

  1. Po utworzeniu potoku ponownie przejdź do obszaru roboczego usługi Azure Databricks, a następnie kliknij pozycję Potoki.

    Nowy rurociąg pojawia się na liście rurociągów.

  2. Aby wyświetlić szczegóły kolejki, kliknij jej nazwę.

  3. Na stronie szczegółów potoku możesz zaplanować potok, klikając pozycję Harmonogram.

  4. Aby ustawić powiadomienia dla potoku, kliknij Ustawienia, a następnie dodaj powiadomienie.

Dla każdego harmonogramu, który dodasz do potoku, Lakeflow Connect automatycznie tworzy zadanie dla tego harmonogramu. Potok przetwarzania jest zadaniem w ramach pracy. Opcjonalnie możesz dodać więcej zadań do zadania.

Weryfikowanie pomyślnego pozyskiwania danych

Widok listy na stronie szczegółów potoku przedstawia liczbę rekordów przetworzonych podczas pozyskiwania danych. Te liczby są odświeżane automatycznie.

Weryfikowanie replikacji

Kolumny Upserted records i Deleted records nie są domyślnie wyświetlane. Możesz je włączyć, klikając przycisk Ikona konfiguracji kolumn i wybierając je.

Dodatkowe zasoby