Partilhar via


Criar um pipeline de ingestão do Confluence

Importante

O conector Confluence está em Beta.

Esta página descreve como criar um pipeline de ingestão Confluence usando o Lakeflow Connect do Databricks. As seguintes interfaces são suportadas:

  • Pacotes de Ativos da Databricks
  • Databricks APIs
  • Databricks SDKs
  • CLI do Databricks

Antes de começar

Para criar o pipeline de ingestão, você deve atender aos seguintes requisitos:

  • Seu espaço de trabalho deve estar habilitado para o Catálogo Unity.

  • A computação sem servidor deve ser habilitada para seu espaço de trabalho. Consulte Requisitos de computação sem servidor.

  • Se você planeja criar uma nova conexão: você deve ter CREATE CONNECTION privilégios no metastore.

    Se o conector oferecer suporte à criação de pipeline baseada em interface do usuário, um administrador poderá criar a conexão e o pipeline ao mesmo tempo concluindo as etapas nesta página. No entanto, se os usuários que criam pipelines usam a criação de pipeline baseada em API ou são usuários não administradores, um administrador deve primeiro criar a conexão no Gerenciador de Catálogos. Consulte Conectar-se a fontes de ingestão gerenciadas.

  • Se pretendes usar uma conexão existente: Deves ter USE CONNECTION privilégios ou ALL PRIVILEGES no objeto de conexão.

  • Você deve ter USE CATALOG privilégios no catálogo de destino.

  • Você deve ter USE SCHEMA e CREATE TABLE privilégios em um esquema existente ou CREATE SCHEMA privilégios no catálogo de destino.

Para ingerir a partir da Confluence, veja Configurar OAuth U2M para a ingestão da Confluence.

Criar o pipeline de ingestão

É necessário ter USE CONNECTION ou ALL PRIVILEGES numa ligação para criar um pipeline de ingestão.

Este passo descreve como criar o pipeline de ingestão. Cada tabela ingerida é gravada numa tabela de streaming com o mesmo nome.

Pacotes de Ativos da Databricks

  1. Crie um novo pacote usando a CLI do Databricks:

    databricks bundle init
    
  2. Adicione dois novos arquivos de recursos ao pacote:

    • Um ficheiro de definição de pipeline (resources/confluence_pipeline.yml).
    • Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (resources/confluence_job.yml).

    O seguinte é um ficheiro de exemplo resources/confluence_pipeline.yml:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for confluence_dab
    resources:
      pipelines:
        pipeline_confluence:
          name: confluence_pipeline
          catalog: ${var.dest_catalog}
          target: ${var.dest_schema}
          ingestion_definition:
            connection_name: confluence_connection
            objects:
              - table:
                  source_schema: default
                  source_table: pages
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  destination_table: <table-name>
    

    O seguinte é um ficheiro de exemplo resources/confluence_job.yml:

    resources:
      jobs:
        confluence_dab_job:
          name: confluence_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_confluence.id}
    
  3. Implante o pipeline usando a CLI do Databricks:

    databricks bundle deploy
    

Caderno de Notas do Databricks

Célula 1

Esta célula inicializa o ambiente, autentica a API REST do Databricks e define uma função auxiliar para verificar as respostas da API. Não modifiquem esta célula.

import json
import requests
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"


headers = {
   'Authorization': 'Bearer {}'.format(api_token),
   'Content-Type': 'application/json'
}


def check_response(response):
   if response.status_code == 200:
       print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
   else:
       print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")

Célula 2

Esta célula define funções para interagir com a API Pipelines (criar, editar, eliminar). Não modifiquem esta célula.

def create_pipeline(pipeline_definition: str):
 response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
 check_response(response)


def edit_pipeline(id: str, pipeline_definition: str):
 response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
 check_response(response)


def delete_pipeline(id: str):
 response = requests.delete(url=f"{api_url}/{id}", headers=headers)
 check_response(response)


def list_pipeline(filter: str):
 body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
 response = requests.get(url=api_url, headers=headers, data=body)
 check_response(response)


def get_pipeline(id: str):
 response = requests.get(url=f"{api_url}/{id}", headers=headers)
 check_response(response)


def start_pipeline(id: str, full_refresh: bool=False):
 body = f"""
 {{
   "full_refresh": {str(full_refresh).lower()},
   "validate_only": false,
   "cause": "API_CALL"
 }}
 """
 response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
 check_response(response)

Célula 3

Esta célula cria um pipeline de ingestão. Modifique esta célula com os detalhes do seu pipeline.

Pode escrever em vários catálogos de destino ou esquemas de dados. No entanto, pipelines com múltiplos destinos não suportarão a edição da interface de utilizador quando ela estiver disponível.

pipeline_name = "YOUR_PIPELINE_NAME"
connection_name = "YOUR_CONNECTION_NAME"
pipeline_spec = {
 "name": pipeline_name,
 "ingestion_definition": {
     "connection_name": connection_name,
     "objects": [
       {
         "table": {
           "source_schema": "default",
           "source_table": "pages",
           "destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
           "destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
           "destination_table": "YOUR_DESTINATION_TABLE_NAME"
         }
       },
       {
         "table": {
           "source_schema": "default",
           "source_table": "spaces",
           "destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
           "destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
           "destination_table": "YOUR_DESTINATION_TABLE_NAME"
         }
       },
       {
         "table": {
           "source_schema": "default",
           "source_table": "attachments",
           "destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
           "destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
           "destination_table": "YOUR_DESTINATION_TABLE_NAME"
         }
       },
       {
         "table": {
           "source_schema": "default",
           "source_table": "classification_levels",
           "destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
           "destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
           "destination_table": "YOUR_DESTINATION_TABLE_NAME"
           }
       },
       {
         "table": {
           "source_schema": "default",
           "source_table": "labels",
           "destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
           "destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
           "destination_table": "YOUR_DESTINATION_TABLE_NAME"
         }
       },
       {
         "table": {
           "source_schema": "default",
           "source_table": "blogposts",
           "destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
           "destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
           "destination_table": "YOUR_DESTINATION_TABLE_NAME"
         }
       }
     ]
 }
}


json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)

CLI do Databricks

Execute o seguinte comando:

databricks pipelines create --json "<pipeline definition or json file path>"

Modelo de definição de pipeline

Valores de especificação da tabela a modificar:

  • name: Um nome único para o pipeline.
  • connection_name: A ligação do Unity Catalog que armazena os detalhes de autenticação da Confluence.
  • source_schema: default
  • source_table: pages, spaces, labels, classification_levels, blogposts, , ou attachments
  • destination_catalog: Um nome para o catálogo de destino que conterá os dados ingeridos.
  • destination_schema: Um nome para o esquema de destino que conterá os dados ingeridos.
  • scd_type: O método SCD a utilizar: SCD_TYPE_1 ou SCD_TYPE_2. O padrão é SCD tipo 1. Para obter mais informações, consulte Habilitar rastreamento de histórico (SCD tipo 2).

Modelo de especificação de tabela:

pipeline_spec = """
{
 "name": "<YOUR_PIPELINE_NAME>",
 "ingestion_definition": {
     "connection_name": "<YOUR_CONNECTION_NAME>",
     "objects": [
        {
          "table": {
            "source_schema": "default",
            "source_table": "<CONFLUENCE_TABLE_NAME>",
            "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
            "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
            "table_configuration": {
              "scd_type": "SCD_TYPE_1"
            }
          }
        }
      ]
 }
}
"""

Próximos passos

Recursos adicionais