API REST do Eventstream

As APIs REST do Microsoft Fabric permitem automatizar procedimentos e processos do Fabric, ajudando sua organização a concluir tarefas de forma mais eficiente e precisa. Ao automatizar esses fluxos de trabalho, você poderá reduzir erros, melhorar a produtividade e obter economia de custos em suas operações.

No Fabric, um item representa um conjunto de funcionalidades dentro de uma experiência específica. Por exemplo, Eventstream é um item na experiência de Inteligência em Tempo Real. Cada item no Fabric é definido por uma definição de item, um objeto que descreve a estrutura, o formato e os componentes principais que compõem o item.

Este artigo oferece um guia abrangente sobre como usar as APIs REST do Microsoft Fabric para criar e gerenciar itens de Eventstream em seu espaço de trabalho do Fabric. Você encontra especificações detalhadas para cada operação de API do Eventstream, juntamente com instruções para configurar e configurar suas chamadas à API.

Para obter uma visão geral completa das APIs REST do Microsoft Fabric, visite: Usar as APIs REST do Microsoft Fabric

APIs do Eventstream com suporte

Atualmente, o Eventstream dá suporte às seguintes APIs baseadas em definição:

APIs Descrição
Criar item de Eventstream com definição Use para criar um item Eventstream no workspace com informações detalhadas sobre sua topologia, incluindo origem, destinos, operadores e fluxos.
Obter definição de item de Eventstream Use para obter uma definição de item eventstream com informações detalhadas sobre sua topologia, incluindo origem, destinos, operadores e fluxos.
Atualizar definição de item de Eventstream Use para atualizar ou editar uma definição de item eventstream, incluindo origem, destinos, operadores e fluxos.

Para gerenciar seus itens de Eventstream usando operações CRUD, visite APIs REST do Fabric – Eventstream. Essas APIs dão suporte para as seguintes operações:

  • Criar o Eventstream
  • Excluir o Eventstream
  • Obter o Eventstream
  • Listar os Eventstreams
  • Atualizar o Eventstream

Como chamar a API eventstream?

Etapa 1: Autenticar em Fabric

Para trabalhar com APIs do Fabric, primeiro você precisará obter um token do Microsoft Entra para o serviço do Fabric e, em seguida, usar esse token no cabeçalho de autorização da chamada à API. Há duas opções para adquirir o token do Microsoft Entra.

opção 1: Obter token usando MSAL.NET

Se seu aplicativo precisar acessar as APIs do Fabric usando uma entidade de serviço, você pode utilizar a biblioteca MSAL.NET para obter um token de acesso. Siga o início rápido da Fabric API para criar um aplicativo de console C#, que adquire um token do Azure AD usando a biblioteca MSAL.Net e, em seguida, usa o HttpClient do C# para chamar a API Listar Workspaces.

Opção 2: Obter token usando o Portal do Fabric

Você pode usar seu token do Azure AD para autenticar e testar as APIs do Fabric. Entre no Portal do Fabric do locatário que deseja testar e pressione F12 para entrar no modo de desenvolvedor do navegador. No console que é aberto, execute:

powerBIAccessToken

Copie o token e cole-o em seu aplicativo.

Observação

Se o fluxo de eventos que você criar incluir qualquer fonte que use uma conexão de nuvem, verifique se a identidade usada para obter o token tem permissão para acessar essa conexão de nuvem, seja uma entidade de serviço ou um usuário.

Etapa 2: Preparar um corpo de fluxo de eventos em JSON

Crie uma carga JSON que será convertida em base64 na solicitação de API. A definição do item do Eventstream segue uma estrutura semelhante a um grafo e consiste nos seguintes componentes:

Campo Descrição
Fontes Fontes de dados que podem ser ingeridas no Eventstream para processamento. As fontes de dados com suporte incluem fontes de streaming do Azure, fontes de streaming de terceiros, CDC do banco de dados (captura de dados de alteração), eventos do Armazenamento de Blobs do Azure e eventos do sistema do Fabric.
Destinos Pontos de extremidade no Fabric para os quais os dados processados podem ser roteados, incluindo Lakehouse, Eventhouse, Activator e outros.
Operadores Processadores de eventos que lidam com fluxos de dados em tempo real, como Filtrar, Agregar, Agrupar Por e Ingressar.
Fluxos Fluxos de dados disponíveis para assinatura e análise no Hub em tempo real. Há dois tipos de fluxos: fluxos padrão e fluxos derivados.

Use os modelos de API no GitHub para ajudar a definir o corpo do Eventstream.

Você pode consultar este documento do Swagger para obter detalhes sobre cada propriedade da API e ele também orienta você na definição de um conteúdo da API Eventstream.

Modo de Ingestão Direta do Eventhouse

Ao usar o modo de ingestão direta do Eventhouse como destino no conteúdo da API Eventstream, siga estas diretrizes:

1. Especificar propriedades necessárias

Verifique se as seguintes propriedades estão definidas corretamente em seu corpo JSON:

  • connectionName – O nome da conexão Eventhouse.
  • mappingRuleName – A regra de mapeamento de ingestão para a tabela de destino.

Para localizar o correto connectionName:

  1. Navegue até o banco de dados KQL do Eventhouse no Fabric.
  2. Selecione fluxos de dados.
  3. Copie o elemento desejado connectionName.

Para mappingRuleName, você pode encontrar instruções detalhadas sobre como criar mapeamentos de ingestão no Mapeamento com referência de ingestão (ingestionMappingReference).

2. Configurar permissões do Service Principal

Se você se autenticar usando uma entidade de serviço, ela deverá ter:

  • Permissões do visualizador de banco de dados.
  • Permissões do Table ingestor

Você pode conceder essas permissões de duas maneiras:

Conceda essas permissões usando os seguintes comandos KQL na interface do usuário:

.add database ['yourDatabase'] viewers (@'aadapp=<clientid>;<tenantid>')
.add table yourTable ingestors (@'aadapp=<id>;<directoryid>')

Substitua clientid e tenantid pelos valores do principal do serviço.

Esses comandos concederão à entidade de serviço as permissões necessárias do plano de dados, permitindo que o Eventhouse crie a conexão e extraia dados do Eventstream. Para obter mais informações, consulte a visão geral das funções de segurança

Uma captura de tela da concessão da permissão banco de dados e tabela por meio da interface do usuário do Kusto.

Para obter mais detalhes sobre como definir um item Eventstream, confira a seção definição de item Eventstream.

Exemplo de definição de eventstream no JSON:

{
  "sources": [
    {
      "name": "SqlServerOnVmDbCdc",
      "type": "SQLServerOnVMDBCDC",
      "properties":
      {
        "dataConnectionId": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
        "tableName": ""
      }
    }
  ],
  "destinations": [
    {
      "name": "Lakehouse",
      "type": "Lakehouse",
      "properties":
      {
        "workspaceId": "bbbb1111-cc22-3333-44dd-555555eeeeee",
        "itemId": "cccc2222-dd33-4444-55ee-666666ffffff",
        "schema": "",
        "deltaTable": "newTable",
        "minimumRows": 100000,
        "maximumDurationInSeconds": 120,
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "derivedStream"}]
    }
  ],
  "streams": [
    {
      "name": "myEventstream-stream",
      "type": "DefaultStream",
      "properties":
      {},
      "inputNodes": [{"name": "SqlServerOnVmDbCdc"}]
    },
    {
      "name": "derivedStream",
      "type": "DerivedStream",
      "properties":
      {
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "GroupBy"}]
    }
  ],
  "operators": [
    {
      "name": "GroupBy",
      "type": "GroupBy",
      "inputNodes": [{"name": "myEventstream-stream"}],
      "properties":
      {
        "aggregations": [
          {
            "aggregateFunction": "Average",
            "column":
            {
              "expressionType": "ColumnReference",
              "node": null,
              "columnName": "payload",
              "columnPathSegments": [{"field": "ts_ms"}]
            },
            "alias": "AVG_ts_ms"
          }
        ],
        "groupBy": [],
        "window":
        {
          "type": "Tumbling",
          "properties":
          {
            "duration":
            {
              "value": 5,
              "unit": "Minute"
            },
            "offset":
            {
              "value": 1,
              "unit": "Minute"
            }
          }
        }
      }
    }
  ],
  "compatibilityLevel": "1.1"
}

Etapa 3: Criar uma cadeia de caracteres base64 do JSON eventstream

Use uma ferramenta como Codificação e Decodificação Base64 para converter o JSON Eventstream em cadeia de caracteres base64.

Uma captura de tela da codificação do JSON do Eventstream na cadeia de caracteres base64.

Etapa 4: Criar o corpo da solicitação de API

Use o JSON eventstream codificado em Base64 na etapa anterior como o conteúdo do corpo da solicitação de API.

Aqui está um exemplo de conteúdo com a cadeia de caracteres codificada em Base64:

{
 "definition": {
  "parts": [
   {
    "path": "eventstream.json",
    "payload": "ewogICJzb3VyY2VzIjogWwogICAgewogICAgICAibmFtZSI6ICJTcWxTZXJ2ZXJPblZtRGJDZGMiLAogICAgICAidHlwZSI6ICJTUUxTZXJ2ZXJPblZNREJDREMiLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAiZGF0YUNvbm5lY3Rpb25JZCI6ICJhYWFhYWFhYS0wMDAwLTExMTEtMjIyMi1iYmJiYmJiYmJiYmIiLAogICAgICAgICJ0YWJsZU5hbWUiOiAiIgogICAgICB9CiAgICB9CiAgXSwKICAiZGVzdGluYXRpb25zIjogWwogICAgewogICAgICAibmFtZSI6ICJMYWtlaG91c2UiLAogICAgICAidHlwZSI6ICJMYWtlaG91c2UiLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAid29ya3NwYWNlSWQiOiAiYmJiYjExMTEtY2MyMi0zMzMzLTQ0ZGQtNTU1NTU1ZWVlZWVlIiwKICAgICAgICAiaXRlbUlkIjogImNjY2MyMjIyLWRkMzMtNDQ0NC01NWVlLTY2NjY2NmZmZmZmZiIsCiAgICAgICAgInNjaGVtYSI6ICIiLAogICAgICAgICJkZWx0YVRhYmxlIjogIm5ld1RhYmxlIiwKICAgICAgICAibWluaW11bVJvd3MiOiAxMDAwMDAsCiAgICAgICAgIm1heGltdW1EdXJhdGlvbkluU2Vjb25kcyI6IDEyMCwKICAgICAgICAiaW5wdXRTZXJpYWxpemF0aW9uIjoKICAgICAgICB7CiAgICAgICAgICAidHlwZSI6ICJKc29uIiwKICAgICAgICAgICJwcm9wZXJ0aWVzIjoKICAgICAgICAgIHsKICAgICAgICAgICAgImVuY29kaW5nIjogIlVURjgiCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICB9LAogICAgICAiaW5wdXROb2RlcyI6IFt7Im5hbWUiOiAiZGVyaXZlZFN0cmVhbSJ9XQogICAgfQogIF0sCiAgInN0cmVhbXMiOiBbCiAgICB7CiAgICAgICJuYW1lIjogIm15RXZlbnRzdHJlYW0tc3RyZWFtIiwKICAgICAgInR5cGUiOiAiRGVmYXVsdFN0cmVhbSIsCiAgICAgICJwcm9wZXJ0aWVzIjoKICAgICAge30sCiAgICAgICJpbnB1dE5vZGVzIjogW3sibmFtZSI6ICJTcWxTZXJ2ZXJPblZtRGJDZGMifV0KICAgIH0sCiAgICB7CiAgICAgICJuYW1lIjogImRlcml2ZWRTdHJlYW0iLAogICAgICAidHlwZSI6ICJEZXJpdmVkU3RyZWFtIiwKICAgICAgInByb3BlcnRpZXMiOgogICAgICB7CiAgICAgICAgImlucHV0U2VyaWFsaXphdGlvbiI6CiAgICAgICAgewogICAgICAgICAgInR5cGUiOiAiSnNvbiIsCiAgICAgICAgICAicHJvcGVydGllcyI6CiAgICAgICAgICB7CiAgICAgICAgICAgICJlbmNvZGluZyI6ICJVVEY4IgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfSwKICAgICAgImlucHV0Tm9kZXMiOiBbeyJuYW1lIjogIkdyb3VwQnkifV0KICAgIH0KICBdLAogICJvcGVyYXRvcnMiOiBbCiAgICB7CiAgICAgICJuYW1lIjogIkdyb3VwQnkiLAogICAgICAidHlwZSI6ICJHcm91cEJ5IiwKICAgICAgImlucHV0Tm9kZXMiOiBbeyJuYW1lIjogIm15RXZlbnRzdHJlYW0tc3RyZWFtIn1dLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAiYWdncmVnYXRpb25zIjogWwogICAgICAgICAgewogICAgICAgICAgICAiYWdncmVnYXRlRnVuY3Rpb24iOiAiQXZlcmFnZSIsCiAgICAgICAgICAgICJjb2x1bW4iOgogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImV4cHJlc3Npb25UeXBlIjogIkNvbHVtblJlZmVyZW5jZSIsCiAgICAgICAgICAgICAgIm5vZGUiOiBudWxsLAogICAgICAgICAgICAgICJjb2x1bW5OYW1lIjogInBheWxvYWQiLAogICAgICAgICAgICAgICJjb2x1bW5QYXRoU2VnbWVudHMiOiBbeyJmaWVsZCI6ICJ0c19tcyJ9XQogICAgICAgICAgICB9LAogICAgICAgICAgICAiYWxpYXMiOiAiQVZHX3RzX21zIgogICAgICAgICAgfQogICAgICAgIF0sCiAgICAgICAgImdyb3VwQnkiOiBbXSwKICAgICAgICAid2luZG93IjoKICAgICAgICB7CiAgICAgICAgICAidHlwZSI6ICJUdW1ibGluZyIsCiAgICAgICAgICAicHJvcGVydGllcyI6CiAgICAgICAgICB7CiAgICAgICAgICAgICJkdXJhdGlvbiI6CiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAidmFsdWUiOiA1LAogICAgICAgICAgICAgICJ1bml0IjogIk1pbnV0ZSIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgIm9mZnNldCI6CiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAidmFsdWUiOiAxLAogICAgICAgICAgICAgICJ1bml0IjogIk1pbnV0ZSIKICAgICAgICAgICAgfQogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIF0sCiAgImNvbXBhdGliaWxpdHlMZXZlbCI6ICIxLjEiCn0=",
    "payloadType": "InlineBase64"
   },
   {
    "path": ".platform",
    "payload": "ewogICIkc2NoZW1hIjogImh0dHBzOi8vZGV2ZWxvcGVyLm1pY3Jvc29mdC5jb20vanNvbi1zY2hlbWFzL2ZhYnJpYy9naXRJbnRlZ3JhdGlvbi9wbGF0Zm9ybVByb3BlcnRpZXMvMi4wLjAvc2NoZW1hLmpzb24iLAogICJtZXRhZGF0YSI6IHsKICAgICJ0eXBlIjogIkV2ZW50c3RyZWFtIiwKICAgICJkaXNwbGF5TmFtZSI6ICJhbGV4LWVzMSIKICB9LAogICJjb25maWciOiB7CiAgICAidmVyc2lvbiI6ICIyLjAiLAogICAgImxvZ2ljYWxJZCI6ICIwMDAwMDAwMC0wMDAwLTAwMDAtMDAwMC0wMDAwMDAwMDAwMDAiCiAgfQp9",
    "payloadType": "InlineBase64"
   }
  ]
 }
}

Etapa 5: Criar um item eventstream usando a API

Em seu aplicativo, envie uma solicitação para criar um item Eventstream com a cadeia de caracteres codificada em Base64 no conteúdo.

exemplo do PowerShell:

$evenstreamAPI = "https://api.fabric.microsoft.com/v1/workspaces/$workspaceId/items" 

## Invoke the API to create the Eventstream
Invoke-RestMethod -Headers $headerParams -Method POST -Uri $evenstreamAPI -Body ($body) -ContentType "application/json"

Definição de item de fluxo de evento

A definição do item Eventstream tem uma estrutura semelhante a um grafo que consiste em quatro componentes: fontes, destinos, operadores e fluxos.

Fontes

Para definir uma fonte Eventstream no corpo da API, verifique se cada campo e propriedade estão especificados corretamente de acordo com a tabela.

Campo Tipo Descrição Requisito Valores permitidos / Formato
id Cadeia de caracteres (UUID) O identificador exclusivo da origem, gerado pelo sistema. Opcional em CREATE, necessário em UPDATE Formato UUID
name fio Um nome exclusivo para a origem, usado para identificá-lo no Eventstream. Obrigatório Qualquer cadeia de caracteres válida
type Cadeia de caracteres (enumeração) Especifica o tipo de fonte. Deve corresponder a um dos valores predefinidos. Obrigatório AmazonKinesis, AmazonMSKKafka, ApacheKafka, AzureCosmosDBCDC, AzureBlobStorageEvents, AzureEventHub, AzureIoTHub, AzureSQLDBCDC, AzureSQLMIDBCDC, ConfluentCloud, CustomEndpoint, FabricCapacityUtilizationEvents, GooglePubSub, MySQLCDC, PostgreSQLCDC, SampleData, FabricWorkspaceItemEvents, FabricJobEvents, FabricOneLakeEvents
properties Objeto Outras configurações específicas para o tipo de origem selecionado. Obrigatório Exemplo de tipo AzureEventHub: dataConnectionId,consumerGroupName e inputSerialization

Exemplo de origem do Eventstream no corpo da API:

{
  "sources": [
    {
      "id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
      "name": "AzureEventHubSource",
      "type": "AzureEventHub",
      "properties":
      {
        "dataConnectionId": "bbbbbbbb-1111-2222-3333-cccccccccccc",
        "consumerGroupName": "$Default",
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      }
    }
  ]
}

Destinos

Para definir um destino do Eventstream no corpo da API, verifique se cada campo e propriedade estão especificados corretamente de acordo com a tabela.

Campo Tipo Descrição Requisito Valores permitidos / Formato
id Cadeia de caracteres (UUID) O identificador exclusivo do destino, gerado pelo sistema. Opcional em CREATE, necessário em UPDATE Formato UUID
name fio Um nome exclusivo para o destino, usado para identificá-lo no Eventstream. Obrigatório Qualquer cadeia de caracteres válida
type Cadeia de caracteres (enumeração) Especifica o tipo de destino. Deve corresponder a um dos valores predefinidos. Obrigatório "Activator", "CustomEndpoint", , "Eventhouse""Lakehouse"
properties Objeto Outras configurações específicas para o tipo de destino selecionado. Obrigatório Exemplo de tipo Eventhouse: "dataIngestionMode", "workspaceId", "itemId" e "databaseName"
inputNodes Matriz Uma referência aos nós de entrada para o destino, como o nome do Eventstream ou um nome de operador. Obrigatório Exemplo: eventstream-1

Novamente, se você estiver usando o destino do modo de ingestão direta do Eventhouse, verifique se a connectionName e mappingRuleName propriedade estão especificadas corretamente. Para encontrar o connectionName correto, navegue até o banco de dados KQL do Eventhouse, selecione Data streams e copie o desejado connectionName. Para obter instruções detalhadas sobre como criar mapeamentos de ingestão, consulte Mapeamento com ingestionMappingReference.


Exemplo de origem do Eventstream no corpo da API:

{
  "destinations": [
    {
      "id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
      "name": "EventhouseDestination",
      "type": "Eventhouse",
      "properties":
      {
        "dataIngestionMode": "ProcessedIngestion",
        "workspaceId": "bbbbbbbb-1111-2222-3333-cccccccccccc",
        "itemId": "cccc2222-dd33-4444-55ee-666666ffffff",
        "databaseName": "myeventhouse",
        "tableName": "mytable",
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "eventstream-1"}]
    }
  ]
}

Operadores

Para definir um operador do Eventstream no corpo da API, verifique se cada campo e propriedade estão especificados corretamente de acordo com a tabela.

Campo Tipo Descrição Requisito Valores permitidos / Formato
name fio Um nome exclusivo para o operador. Obrigatório Qualquer cadeia de caracteres válida
type Cadeia de caracteres (enumeração) Especifica o tipo de operador. Deve corresponder a um dos valores predefinidos. Obrigatório "Filter", "Join", "ManageFields", "Aggregate", , "GroupBy", "Union", "Expand"
properties Objeto Outras configurações específicas para o tipo de operador selecionado. Obrigatório Exemplo de tipo Filter: "conditions"
inputNodes Matriz Uma lista de referências aos nós de entrada do operador. Obrigatório Exemplo: eventstream-1
inputSchemas Matriz Uma lista de referências aos nós de entrada do operador. Opcional Exemplo de tipo Filter: "schema"

Exemplo do operador do Eventstream no corpo da API:

{
  "operators": [
    {
      "name": "FilterName",
      "type": "Filter",
      "inputNodes": [{"name": "eventstream-1"}],
      "properties":
      {
        "conditions": [
          {
            "column":
            {
              "node": "nodeName",
              "columnName": "columnName",
              "columnPath": ["path","to","column"]
            },
            "operator": "Equals",
            "value":
            {
              "dataType": "nvarchar(max)",
              "value": "stringValue"
            }
          }
        ]
      }
    }
  ]
}

Fluxos

Para definir um fluxo no corpo da API, verifique se cada campo e propriedade estão especificados corretamente de acordo com a tabela.

Campo Tipo Descrição Requisito Valores permitidos / Formato
id Cadeia de caracteres (UUID) O identificador exclusivo do fluxo, gerado pelo sistema. Opcional Formato UUID
name fio Um nome exclusivo para o fluxo. Obrigatório Qualquer cadeia de caracteres válida
type Cadeia de caracteres (enumeração) Especifica o tipo de fluxo. Deve corresponder a um dos valores predefinidos. Obrigatório "DefaultStream", "DerivedStream"
properties Objeto Outras configurações específicas para o tipo de fluxo selecionado. Obrigatório Exemplo de tipo Filter: "conditions"
inputNodes Matriz Uma lista de referências aos nós de entrada do fluxo. Opcional Exemplo: [], "eventstream-1"

Exemplo de fluxo no corpo da API:

{
  "streams": [
    {
      "name": "myEventstream-stream",
      "type": "DefaultStream",
      "properties":
      {},
      "inputNodes": [{"name": "sourceName"}]
    },
    {
      "name": "DerivedStreamName",
      "type": "DerivedStream",
      "properties":
      {
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "FilterName"}]
    }
  ]
}