Ingerir dados do Azure Cosmos DB no Azure Data Explorer

O Azure Data Explorer dá suporte à ingestão de dados do Azure Cosmos DB for NoSql usando um feed de alterações. A conexão de dados do feed de alterações do Cosmos DB é um pipeline de ingestão que escuta o feed de alterações do Cosmos DB e ingere os dados em sua tabela de Data Explorer. O feed de alterações escuta documentos novos e atualizados, mas não registra exclusões. Para obter informações gerais sobre a ingestão de dados no Azure Data Explorer, confira Visão geral da ingestão de dados do Azure Data Explorer.

Cada conexão de dados escuta um contêiner específico do Cosmos DB e ingere dados em uma tabela especificada (mais de uma conexão pode ingerir em uma única tabela). O método de ingestão dá suporte à ingestão de streaming (quando habilitada) e à ingestão na fila.

Neste artigo, você aprenderá a configurar uma conexão de dados do feed de alterações do Cosmos DB para ingerir dados no Azure Data Explorer com a Identidade Gerenciada do Sistema. Examine as considerações antes de começar.

Use as seguintes etapas para configurar um conector:

Etapa 1: escolher uma tabela de Data Explorer do Azure e configurar o mapeamento de tabela

Etapa 2: Criar uma conexão de dados do Cosmos DB

Etapa 3: Testar a conexão de dados

Pré-requisitos

Etapa 1: escolher uma tabela de Data Explorer do Azure e configurar o mapeamento de tabela

Antes de criar uma conexão de dados, crie uma tabela em que você armazenará os dados ingeridos e aplicará um mapeamento que corresponde ao esquema no contêiner do Cosmos DB de origem. Se o cenário exigir mais do que um mapeamento simples de campos, você poderá usar políticas de atualização para transformar e mapear dados ingeridos do feed de alterações.

O seguinte mostra um esquema de exemplo de um item no contêiner do Cosmos DB:

{
    "id": "17313a67-362b-494f-b948-e2a8e95e237e",
    "name": "Cousteau",
    "_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
    "_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
    "_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
    "_attachments": "attachments/",
    "_ts": 1651131409
}

Use as seguintes etapas para criar uma tabela e aplicar um mapeamento de tabela:

  1. No Azure Data Explorer interface do usuário da Web, no menu de navegação à esquerda, selecione Consulta e, em seguida, selecione o banco de dados no qual você deseja criar a tabela.

  2. Execute o comando a seguir para criar uma tabela chamada TestTable.

    .create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
    
  3. Execute o comando a seguir para criar o mapeamento de tabela.

    O comando mapeia propriedades personalizadas de um documento JSON do Cosmos DB para colunas na tabela TestTable , da seguinte maneira:

    Propriedade Cosmos DB Coluna da tabela Transformação
    id Id Nenhum
    name Nome Nenhum
    _Ts _ts Nenhum
    _Ts _Timestamp DateTimeFromUnixSeconds Usa para transformar_ts (segundos UNIX) para _timestamp (datetime))

    Observação

    É recomendável usar as seguintes colunas de carimbo de data/hora:

    • _ts: use esta coluna para reconciliar dados com o Cosmos DB.
    • _timestamp: use esta coluna para executar filtros de tempo eficientes em suas consultas Kusto. Para obter mais informações, consulte Melhor prática de consulta.
    .create table TestTable ingestion json mapping "DocumentMapping"
    ```
    [
        {"column":"Id","path":"$.id"},
        {"column":"Name","path":"$.name"},
        {"column":"_ts","path":"$._ts"},
        {"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}
    ]
    ```
    

Transformar e mapear dados com políticas de atualização

Se o cenário exigir mais do que um mapeamento simples de campos, você poderá usar políticas de atualização para transformar e mapear dados ingeridos do feed de alterações.

As políticas de atualização são uma maneira de transformar dados conforme eles são ingeridos em sua tabela. Eles são escritos em Linguagem de Consulta Kusto e são executados no pipeline de ingestão. Eles podem ser usados para transformar dados de uma ingestão de feed de alterações do Cosmos DB, como nos seguintes cenários:

  • Seus documentos contêm matrizes que seriam mais fáceis de consultar se elas forem transformadas em várias linhas usando o mv-expand operador .
  • Você deseja filtrar documentos. Por exemplo, você pode filtrar documentos por tipo usando o where operador .
  • Você tem uma lógica complexa que não pode ser representada em um mapeamento de tabela.

Para obter informações sobre como criar e gerenciar políticas de atualização, consulte Visão geral da política de atualização.

Etapa 2: Criar uma conexão de dados do Cosmos DB

Você pode usar os seguintes métodos para criar o conector de dados:

  1. No portal do Azure, vá para a página de visão geral do cluster e selecione a guia Introdução.

  2. No bloco Ingestão de dados, selecione Criar conexão> de dadosCosmos DB.

    Captura de tela da guia Introdução, mostrando a opção Criar conexão de dados do Cosmos DB.

  3. No painel Criar conexão de dados do Cosmos DB, preencha o formulário com as informações na tabela:

    Captura de tela do painel de conexão de dados, mostrando os campos de formulário com valores.

    Campo Descrição
    Nome do banco de dados Escolha o banco de dados Data Explorer do Azure no qual você deseja ingerir dados.
    Nome da conexão de dados Especifique um nome para a conexão de dados.
    Assinatura Selecione a assinatura que contém sua conta NoSQL do Cosmos DB.
    Conta do Cosmos DB Escolha a conta do Cosmos DB da qual você deseja ingerir dados.
    Banco de dados SQL Escolha o banco de dados do Cosmos DB do qual você deseja ingerir dados.
    Contêiner de SQL Escolha o contêiner do Cosmos DB do qual você deseja ingerir dados.
    Nome da tabela Especifique o nome da tabela Data Explorer do Azure ao qual você deseja ingerir dados.
    Nome do mapeamento Opcionalmente, especifique o nome de mapeamento a ser usado para a conexão de dados.
  4. Opcionalmente, na seção Configurações avançadas , faça o seguinte:

    1. Especifique a data de início da recuperação de evento. Esse é o momento em que o conector começará a ingerir dados. Se você não especificar uma hora, o conector começará a ingerir dados a partir do momento em que você criar a conexão de dados. O formato de data recomendado é o padrão UTC ISO 8601, especificado da seguinte maneira: yyyy-MM-ddTHH:mm:ss.fffffffZ.

    2. Selecione Atribuído pelo usuário e, em seguida, selecione a identidade. Por Padrão, a identidade gerenciada atribuída pelo sistema é usada pela conexão. Se necessário, você pode usar uma identidade atribuída pelo usuário .

      Captura de tela do painel de conexão de dados, mostrando as configurações de Avanço.

  5. Selecione Criar para engradar a conexão de dados.

Etapa 3: Testar a conexão de dados

  1. No contêiner do Cosmos DB, insira o seguinte documento:

    {
        "name":"Cousteau"
    }
    
  2. Na interface do usuário da Web do Azure Data Explorer, execute a seguinte consulta:

    TestTable
    

    O conjunto de resultados deve ser parecido com a seguinte imagem:

    Captura de tela do painel de resultados mostrando o documento ingerido.

Observação

O Azure Data Explorer tem uma política de agregação (envio em lote) para ingestão de dados na fila projetada para otimizar o processo de ingestão. A política de envio em lote padrão é configurada para selar um lote depois que uma das seguintes condições for verdadeira para o lote: um tempo máximo de atraso de 5 minutos, tamanho total de um GB ou 1000 blobs. Portanto, você pode ter alguma latência. Para obter mais informações, confira a política de envio em lote. Para reduzir a latência, configure sua tabela para dar suporte ao streaming. Confira política de streaming.

Considerações

As seguintes considerações se aplicam ao feed de alterações do Cosmos DB:

  • O feed de alterações não expõe eventos de exclusão .

    O feed de alterações do Cosmos DB inclui apenas documentos novos e atualizados. Se você precisar de informações sobre documentos excluídos, poderá configurar o feed usando um marcador flexível para marcar um documento do Cosmos DB como excluído. Uma propriedade é adicionada para atualizar eventos que indicam se um documento foi excluído. Em seguida, você pode usar o where operador em suas consultas para filtrá-las.

    Por exemplo, se você mapear a propriedade excluída para uma coluna de tabela chamada IsDeleted, poderá filtrar documentos excluídos com a seguinte consulta:

    TestTable
    | where not(IsDeleted)
    
  • O feed de alterações expõe apenas a atualização mais recente de um documento.

    Para entender a ramificação da segunda consideração, examine o seguinte cenário:

    Um contêiner do Cosmos DB contém os documentos A e B. As alterações em uma propriedade chamada foo são mostradas na tabela a seguir:

    ID do documento Propriedade foo Evento Carimbo de data/hora do documento (_ts)
    A Vermelho Criação 10
    B Azul Criação 20
    A Laranja Atualizar 30
    A Rosa Atualizar 40
    B Violet Atualizar 50
    A Carmine Atualizar 50
    B NeonBlue Atualizar 70

    A API do feed de alterações é sondada pelo conector de dados em intervalos regulares, normalmente a cada poucos segundos. Cada sondagem contém alterações que ocorreram no contêiner entre chamadas, mas apenas a versão mais recente da alteração por documento.

    Para ilustrar o problema, considere uma sequência de chamadas à API com carimbos de data/hora 15, 35, 55 e 75 , conforme mostrado na tabela a seguir:

    Carimbo de data/hora da chamada à API ID do documento Propriedade foo Carimbo de data/hora do documento (_ts)
    15 A Vermelho 10
    35 B Azul 20
    35 A Laranja 30
    55 B Violet 50
    55 A Carmine 60
    75 B NeonBlue 70

    Comparando os resultados da API com a lista de alterações feitas no documento do Cosmos DB, você observará que elas não correspondem. O evento de atualização para o documento A, realçado na tabela de alterações no carimbo de data/hora 40, não aparece nos resultados da chamada à API.

    Para entender por que o evento não aparece, examinaremos as alterações no documento A entre as chamadas à API nos carimbos de data/hora 35 e 55. Entre essas duas chamadas, o documento A foi alterado duas vezes, da seguinte maneira:

    ID do documento Propriedade foo Evento Carimbo de data/hora do documento (_ts)
    A Rosa Atualizar 40
    A Carmine Atualizar 50

    Quando a chamada à API no carimbo de data/hora 55 é feita, a API do feed de alterações retorna a versão mais recente do documento. Nesse caso, a versão mais recente do documento A é a atualização no carimbo de data/hora 50, que é a atualização para a propriedade foo de Pink para Carmine.

    Devido a esse cenário, o conector de dados pode perder algumas alterações intermediárias do documento. Por exemplo, alguns eventos poderão ser perdidos se o serviço de conexão de dados ficar inativo por alguns minutos ou se a frequência de alterações de documento for maior do que a frequência de sondagem da API. No entanto, o estado mais recente de cada documento é capturado.

  • Não há suporte para a exclusão e recriação de um contêiner do Cosmos DB

    O Azure Data Explorer controla o feed de alterações verificando a "posição" em que está no feed. Isso é feito usando o token de continuação em cada partição física do contêiner. Quando um contêiner é excluído/recriado, esses tokens de continuação são inválidos e não são redefinidos: você deve excluir e recriar a conexão de dados.

Estimar o custo

Quanto o uso da conexão de dados do Cosmos DB afeta o uso das RUs (Unidades de Solicitação) do contêiner do Cosmos DB?

O conector invoca a API do Feed de Alterações do Cosmos DB em cada partição física do contêiner para até uma vez por segundo. Os seguintes custos estão associados a estas invocações:

Custo Description
Custos fixos Os custos fixos são de cerca de 2 RUs por partição física a cada segundo.
Custos variáveis Os custos variáveis são cerca de 2% das RUs usadas para gravar documentos, embora isso possa variar dependendo do seu cenário. Por exemplo, se você gravar 100 documentos em um contêiner do Cosmos DB, o custo da gravação desses documentos será de 1.000 RUs. O custo correspondente para usar o conector para ler esses documentos é de cerca de 2% do custo para gravá-los, aproximadamente 20 RUs.