Ingerir dados do Azure Cosmos DB no Azure Data Explorer
O Azure Data Explorer dá suporte à ingestão de dados do Azure Cosmos DB para NoSQL usando um feed de alterações. A conexão de dados do feed de alterações do Cosmos DB é um pipeline de ingestão que escuta o feed de alterações do Cosmos DB e ingere os dados na sua tabela do Data Explorer. O feed de alterações escuta documentos novos e atualizados, mas não registra exclusões. Para obter informações gerais sobre a ingestão de dados no Azure Data Explorer, confira Visão geral da ingestão de dados do Azure Data Explorer.
Cada conexão de dados escuta um contêiner específico do Cosmos DB e ingere dados em uma tabela especificada (mais de uma conexão pode ingerir em uma única tabela). O método de ingestão dá suporte à ingestão de streaming (quando habilitado) e à ingestão na fila.
Neste artigo, você aprenderá a configurar uma conexão de dados do feed de alterações do Cosmos DB para ingerir dados no Azure Data Explorer com a Identidade Gerenciada do Sistema. Examine as considerações antes de iniciar.
Use as seguintes etapas para configurar um conector:
Etapa 1: Escolher uma tabela do Azure Data Explorer e configurar seu mapeamento de tabela
Etapa 2: Criar uma conexão de dados do Cosmos DB
Etapa 3: Testar a conexão de dados
Pré-requisitos
- Uma assinatura do Azure. Criar uma conta gratuita do Azure.
- Um cluster e um banco de dados do Azure Data Explorer. Criar um cluster e um banco de dados.
- Um contêiner de uma conta do Cosmos DB para NoSQL.
- Se a sua conta do Cosmos DB bloquear o acesso à rede, por exemplo, usando um ponto de extremidade privado, você deverá criar um ponto de extremidade privado gerenciado para a conta do Cosmos DB. Isso é necessário para que o cluster invoque a API do feed de alterações.
Etapa 1: Escolher uma tabela do Azure Data Explorer e configurar seu mapeamento de tabela
Antes de criar uma conexão de dados, crie uma tabela em que você armazenará os dados ingeridos e aplicará um mapeamento que corresponde ao esquema no contêiner do Cosmos DB de origem. Se o cenário exigir mais do que um mapeamento simples de campos, você poderá usar políticas de atualização para transformar e mapear dados ingeridos do feed de alterações.
Este é um esquema de exemplo de um item no contêiner do Cosmos DB:
{
"id": "17313a67-362b-494f-b948-e2a8e95e237e",
"name": "Cousteau",
"_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
"_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
"_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
"_attachments": "attachments/",
"_ts": 1651131409
}
Use as seguintes etapas para criar uma tabela e aplicar um mapeamento de tabela:
Na interface do usuário da Web do Azure Data Explorer, no menu de navegação esquerdo, selecione Consulta e selecione o banco de dados no qual deseja criar a tabela.
Execute o comando a seguir para criar uma tabela chamada TestTable.
.create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
Execute o comando a seguir para criar o mapeamento de tabela.
O comando mapeia propriedades personalizadas de um documento JSON do Cosmos DB para colunas na tabela TestTable, da seguinte maneira:
Propriedade do Cosmos DB Coluna de tabela Transformação id ID Nenhum name Nome Nenhum _ts _ts Nenhum _ts _timestamp Usa DateTimeFromUnixSeconds
para transformar _ts (segundos UNIX) em _timestamp (datetime
))Observação
É recomendável usar as seguintes colunas de carimbo de data/hora:
- _ts: use esta coluna para reconciliar dados com o Cosmos DB.
- _timestamp: use esta coluna para executar filtros de tempo eficientes em suas consultas Kusto. Para obter mais informações, confira a Melhor prática para consulta.
.create table TestTable ingestion json mapping "DocumentMapping" ``` [ {"column":"Id","path":"$.id"}, {"column":"Name","path":"$.name"}, {"column":"_ts","path":"$._ts"}, {"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"} ] ```
Transformar e mapear dados com políticas de atualização
Se o cenário exigir mais do que um mapeamento simples de campos, você poderá usar políticas de atualização para transformar e mapear dados ingeridos do feed de alterações.
Políticas de atualização são uma maneira de transformar dados à medida que são ingeridos em sua tabela. Elas são escritas na Linguagem de Consulta Kusto e são executadas no pipeline de ingestão. Podem ser usadas para transformar dados de uma ingestão de feed de alterações do Cosmos DB, como nos seguintes cenários:
- Seus documentos contêm matrizes que seriam mais fáceis de consultar se fossem transformados em várias linhas usando o operador
mv-expand
. - Você deseja filtrar documentos. Por exemplo, você pode filtrar documentos por tipo usando o operador
where
. - Você tem uma lógica complexa que não pode ser representada em um mapeamento de tabela.
Para obter informações sobre como criar e gerenciar políticas de atualização, consulte Visão geral da política de atualização.
Etapa 2: Criar uma conexão de dados do Cosmos DB
Você pode usar os seguintes métodos para criar o conector de dados:
No portal do Azure, acesse a página de visão geral do cluster e selecione a guia Introdução.
No bloco Ingestão de dados, selecione Criar conexão de dados>Cosmos DB.
No painel Criar conexão de dados do Cosmos DB, preencha o formulário com as informações na tabela:
Campo Descrição Nome do banco de dados Escolha o banco de dados do Azure Data Explorer no qual você deseja ingerir dados. Nome da conexão de dados Especifique um nome para a conexão de dados. Assinatura Selecione a assinatura que contém sua conta do Cosmos DB for NoSQL. Conta do Cosmos DB Escolha a conta do Cosmos DB da qual você deseja ingerir dados. Banco de dados SQL Escolha o banco de dados do Cosmos DB do qual você deseja ingerir dados. Contêiner de SQL Escolha o contêiner do Cosmos DB do qual você deseja ingerir dados. Nome da tabela Especifique o nome da tabela do Azure Data Explorer para o qual você deseja ingerir dados. Nome do mapeamento Opcionalmente, especifique o nome de mapeamento a ser usado para a conexão de dados. Opcionalmente, em Configurações avançadas, faça o seguinte:
Especifique a Data de início da recuperação do evento. Esse é o momento em que o conector começará a ingerir dados. Se você não especificar uma hora, o conector começará a ingerir dados a partir do momento em que você criar a conexão de dados. O formato de data recomendado é o padrão ISO 8601 UTC, especificado da seguinte maneira:
yyyy-MM-ddTHH:mm:ss.fffffffZ
.Selecione Atribuída ao usuário e selecione a identidade. Por padrão, a identidade gerenciada Atribuída ao sistema é usada pela conexão. Se necessário, você pode usar uma identidade Atribuída ao usuário.
Selecione Criar para criar a conexão de dados.
Etapa 3: Testar a conexão de dados
No contêiner do Cosmos DB, insira o seguinte documento:
{ "name":"Cousteau" }
Na interface do usuário da Web do Azure Data Explorer, execute a seguinte consulta:
TestTable
O conjunto de resultados deve ser parecido com a seguinte imagem:
Observação
O Azure Data Explorer tem uma política de agregação (envio em lote) para a ingestão de dados na fila, criada para otimizar o processo de ingestão. A política de envio em lote padrão está configurada para lacrar um lote quando uma das seguintes condições for verdadeira para o lote: um tempo de atraso máximo de cinco minutos, tamanho total de 1 GB ou 1000 blobs. Portanto, você pode ter alguma latência. Para obter mais informações, confira a política de envio em lote. Para reduzir a latência, configure sua tabela para dar suporte ao streaming. Confira política de streaming.
Considerações
As seguintes considerações se aplicam ao feed de alterações do Cosmos DB:
O feed de alterações não expõe eventos de exclusão.
O feed de alterações do Cosmos DB inclui apenas documentos novos e atualizados. Se você precisar de informações sobre documentos excluídos, poderá configurar o feed usando um marcador flexível para marcar um documento do Cosmos DB como excluído. Uma propriedade é adicionada a eventos de atualização que indicam se um documento foi excluído. Em seguida, você pode usar o operador
where
em suas consultas para filtrá-las.Por exemplo, se você mapear a propriedade excluída para uma coluna de tabela chamada IsDeleted, poderá filtrar documentos excluídos com a seguinte consulta:
TestTable | where not(IsDeleted)
O feed de alterações expõe apenas a atualização mais recente de um documento.
Para entender a ramificação da segunda consideração, examine o seguinte cenário:
Um contêiner do Cosmos DB contém documentos A e B. As alterações em uma propriedade chamada foo aparecem na seguinte tabela:
ID do Documento Propriedade foo Evento Carimbo de data/hora do documento (_ts) Um Vermelho Criação 10 B Azul Criação 20 Um Laranja Atualizar 30 A Pink Atualizar 40 B Violeta Atualizar 50 Um Carmine Atualizar 50 B NeonBlue Atualizar 70 A API do feed de alterações é sondada pelo conector de dados em intervalos regulares, normalmente a cada poucos segundos. Cada sondagem contém alterações que ocorreram no contêiner entre chamadas, mas apenas a versão mais recente da alteração por documento.
Para ilustrar o problema, considere uma sequência de chamadas à API com carimbos de data/hora 15, 35, 55 e 75, conforme aparece na seguinte tabela:
Carimbo de data/hora da chamada à API ID do Documento Propriedade foo Carimbo de data/hora do documento (_ts) 15 Um Vermelho 10 35 B Azul 20 35 Um Laranja 30 55 B Violeta 50 55 Um Carmine 60 75 B NeonBlue 70 Comparando os resultados da API com a lista de alterações feitas no documento do Cosmos DB, você observará que eles não correspondem. O evento de atualização para o documento A, realçado na tabela de alterações no carimbo de data/hora 40, não aparece nos resultados da chamada à API.
Para entender por que o evento não aparece, examinaremos as alterações no documento A entre as chamadas à API nos carimbos de data/hora 35 e 55. Entre essas duas chamadas, o documento A foi alterado duas vezes, da seguinte maneira:
ID do Documento Propriedade foo Evento Carimbo de data/hora do documento (_ts) Um Rosa Atualizar 40 Um Carmine Atualizar 50 Quando a chamada à API no carimbo de data/hora 55 é feita, a API do feed de alterações retorna a versão mais recente do documento. Nesse caso, a versão mais recente do documento A é a atualização no carimbo de data/hora 50, que é a atualização para a propriedade foo de Pink para Carmine.
Devido a esse cenário, o conector de dados pode perder algumas alterações de documento intermediárias. Por exemplo, alguns eventos poderão ser perdidos se o serviço de conexão de dados ficar inativo por alguns minutos ou se a frequência de alterações de documento for maior que a frequência de sondagem da API. No entanto, o estado mais recente de cada documento é capturado.
Não há suporte para a exclusão e recriação de um contêiner do Cosmos DB
O Azure Data Explorer controla o feed de alterações verificando a "posição" em que está no feed. Isso é feito usando o token de continuação em cada partição física do contêiner. Quando um contêiner é excluído/recriado, esses tokens de continuação são inválidos e não são redefinidos: você deve excluir e recriar a conexão de dados.
Estimar o custo
Quanto o uso da conexão de dados do Cosmos DB afeta o uso das RUs (unidades de solicitação) do contêiner do Cosmos DB?
O conector invoca a API do feed de alterações do Cosmos DB em cada partição física do contêiner, até uma vez por segundo. Os seguintes custos são associados a essas invocações:
Custo | Descrição |
---|---|
Custos fixos | Os custos fixos são de aproximadamente 2 RUs por partição física a cada segundo. |
Custos variáveis | Os custos variáveis são cerca de 2% das RUs usadas para gravar documentos, embora isso possa variar dependendo do seu cenário. Por exemplo, se você gravar 100 documentos em um contêiner do Cosmos DB, o custo de gravação desses documentos será de 1.000 RUs. O custo correspondente para usar o conector para ler esses documentos é de aproximadamente 2% do custo para gravá-los, aproximadamente 20 RUs. |