Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Importante
O conector Confluence está em Beta.
Esta página descreve como criar um pipeline de ingestão Confluence usando o Lakeflow Connect do Databricks. As seguintes interfaces são suportadas:
- Pacotes de Ativos da Databricks
- Databricks APIs
- Databricks SDKs
- CLI do Databricks
Antes de começar
Para criar o pipeline de ingestão, você deve atender aos seguintes requisitos:
Seu espaço de trabalho deve estar habilitado para o Catálogo Unity.
A computação sem servidor deve ser habilitada para seu espaço de trabalho. Consulte Requisitos de computação sem servidor.
Se você planeja criar uma nova conexão: você deve ter
CREATE CONNECTIONprivilégios no metastore.Se o conector oferecer suporte à criação de pipeline baseada em interface do usuário, um administrador poderá criar a conexão e o pipeline ao mesmo tempo concluindo as etapas nesta página. No entanto, se os usuários que criam pipelines usam a criação de pipeline baseada em API ou são usuários não administradores, um administrador deve primeiro criar a conexão no Gerenciador de Catálogos. Consulte Conectar-se a fontes de ingestão gerenciadas.
Se pretendes usar uma conexão existente: Deves ter
USE CONNECTIONprivilégios ouALL PRIVILEGESno objeto de conexão.Você deve ter
USE CATALOGprivilégios no catálogo de destino.Você deve ter
USE SCHEMAeCREATE TABLEprivilégios em um esquema existente ouCREATE SCHEMAprivilégios no catálogo de destino.
Para ingerir a partir da Confluence, veja Configurar OAuth U2M para a ingestão da Confluence.
Criar o pipeline de ingestão
É necessário ter USE CONNECTION ou ALL PRIVILEGES numa ligação para criar um pipeline de ingestão.
Este passo descreve como criar o pipeline de ingestão. Cada tabela ingerida é gravada numa tabela de streaming com o mesmo nome.
Pacotes de Ativos da Databricks
Crie um novo pacote usando a CLI do Databricks:
databricks bundle initAdicione dois novos arquivos de recursos ao pacote:
- Um ficheiro de definição de pipeline (
resources/confluence_pipeline.yml). - Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (
resources/confluence_job.yml).
O seguinte é um ficheiro de exemplo
resources/confluence_pipeline.yml:variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for confluence_dab resources: pipelines: pipeline_confluence: name: confluence_pipeline catalog: ${var.dest_catalog} target: ${var.dest_schema} ingestion_definition: connection_name: confluence_connection objects: - table: source_schema: default source_table: pages destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} destination_table: <table-name>O seguinte é um ficheiro de exemplo
resources/confluence_job.yml:resources: jobs: confluence_dab_job: name: confluence_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_confluence.id}- Um ficheiro de definição de pipeline (
Implante o pipeline usando a CLI do Databricks:
databricks bundle deploy
Caderno de Notas do Databricks
Célula 1
Esta célula inicializa o ambiente, autentica a API REST do Databricks e define uma função auxiliar para verificar as respostas da API. Não modifiquem esta célula.
import json
import requests
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
Célula 2
Esta célula define funções para interagir com a API Pipelines (criar, editar, eliminar). Não modifiquem esta célula.
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)
Célula 3
Esta célula cria um pipeline de ingestão. Modifique esta célula com os detalhes do seu pipeline.
Pode escrever em vários catálogos de destino ou esquemas de dados. No entanto, pipelines com múltiplos destinos não suportarão a edição da interface de utilizador quando ela estiver disponível.
pipeline_name = "YOUR_PIPELINE_NAME"
connection_name = "YOUR_CONNECTION_NAME"
pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "pages",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "spaces",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "attachments",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "classification_levels",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "labels",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "blogposts",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
CLI do Databricks
Execute o seguinte comando:
databricks pipelines create --json "<pipeline definition or json file path>"
Modelo de definição de pipeline
Valores de especificação da tabela a modificar:
-
name: Um nome único para o pipeline. -
connection_name: A ligação do Unity Catalog que armazena os detalhes de autenticação da Confluence. -
source_schema:default -
source_table:pages,spaces,labels,classification_levels,blogposts, , ouattachments -
destination_catalog: Um nome para o catálogo de destino que conterá os dados ingeridos. -
destination_schema: Um nome para o esquema de destino que conterá os dados ingeridos. -
scd_type: O método SCD a utilizar:SCD_TYPE_1ouSCD_TYPE_2. O padrão é SCD tipo 1. Para obter mais informações, consulte Habilitar rastreamento de histórico (SCD tipo 2).
Modelo de especificação de tabela:
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "<CONFLUENCE_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_1"
}
}
}
]
}
}
"""
Próximos passos
- Inicie, agende e defina alertas no seu pipeline.