Esercitazione: Eseguire Funzioni di Azure dai processi di Analisi di flusso di Azure
In questa esercitazione viene creato un processo di Analisi di flusso di Azure che legge gli eventi da Hub eventi di Azure, esegue una query sui dati dell'evento e quindi richiama una funzione di Azure, che scrive in un'istanza di cache di Azure per Redis.
Nota
- È possibile eseguire Funzioni di Azure da Analisi di flusso di Azure configurando Funzioni come uno dei sink (output) nel processo di Analisi di flusso. Funzioni offre un'esperienza di calcolo on demand guidata dagli eventi che consente di implementare il codice attivato da eventi generati nei servizi di Azure o in servizi di terze parti. La possibilità offerta da Funzioni di rispondere ai trigger la rende l'output naturale per i processi di Analisi di flusso.
- Analisi di flusso richiama Funzioni tramite trigger HTTP. L'adattatore di output di Funzioni consente agli utenti di connettere Funzioni ad Analisi di flusso, in modo che gli eventi possano essere attivati in base alle query di Analisi di flusso.
- La connessione a Funzioni di Azure all'interno di una rete virtuale da un processo di Analisi di flusso in esecuzione in un cluster multi-tenant non è supportata.
In questa esercitazione apprenderai a:
- Creare un'istanza di Hub eventi di Azure
- Creare un'istanza di Azure Cache per Redis
- Crea una Funzione di Azure
- Creare un processo di Analisi di flusso.
- Configurare l'hub eventi come input e funzione come output
- Eseguire il processo di Analisi di flusso
- Controllare i risultati in Azure Cache per Redis
Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
Prerequisiti
Prima di iniziare, assicurarsi di aver completato i passaggi seguenti:
- Se non si ha una sottoscrizione di Azure, creare un account gratuito.
- Scaricare l'app generatore di eventi di chiamata telefonica, TelcoGenerator.zip dall'Area download Microsoft o ottenere il codice sorgente da GitHub.
Accedere ad Azure
Accedere al portale di Azure.
Creare un hub eventi
È necessario inviare alcuni dati di esempio a un hub eventi prima che Analisi di flusso possa analizzare il flusso di dati delle chiamate fraudolente. In questa esercitazione si inviano dati ad Azure usando Hub eventi di Azure.
Usare la procedura seguente per creare un hub eventi e inviare i dati delle chiamate a tale hub eventi:
Accedere al portale di Azure.
Selezionare Tutti i servizi nel menu a sinistra, selezionare Internet delle cose, passare il mouse su Hub eventi e quindi fare clic sul pulsante + (Aggiungi).
Nella pagina Crea spazio dei nomi, seguire questa procedura:
Selezionare una sottoscrizione di Azure in cui si vuole creare l'hub eventi.
In Gruppo di risorse selezionare Crea nuovo e immettere un nome per il gruppo di risorse. Lo spazio dei nomi di Hub eventi viene creato in questo gruppo di risorse.
Per Nome spazio dei nomi immettere un nome univoco per lo spazio dei nomi di Hub eventi.
In Località selezionare l'area in cui si vuole creare lo spazio dei nomi.
Per Piano tariffario selezionare Standard.
Selezionare Rivedi e crea nella parte inferiore della pagina.
Nella pagina Rivedi e crea della creazione guidata dello spazio dei nomi selezionare Crea nella parte inferiore della pagina dopo aver esaminato tutte le impostazioni.
Dopo aver distribuito correttamente lo spazio dei nomi, selezionare Vai alla risorsa per passare alla pagina Spazio dei nomi di Hub eventi.
Nella pagina Spazio dei nomi di Hub eventi selezionare +Hub eventi sulla barra dei comandi.
Nella pagina Crea hub eventi immettere un nome per l'hub eventi. Impostare Conteggio partizioni su 2. Usare le opzioni predefinite nelle impostazioni rimanenti e selezionare Rivedi e crea.
Nella pagina Rivedi e crea selezionare Crea nella parte inferiore della pagina. Attendere il completamento della distribuzione.
Concedere l'accesso all'hub eventi e ottenere una stringa di connessione
Affinché un'applicazione possa inviare dati ad Hub eventi di Azure, è necessario che l'hub eventi abbia criteri che consentono l'accesso. I criteri di accesso generano una stringa di connessione che include informazioni di autorizzazione.
Nella pagina Spazio dei nomi di Hub eventi selezionare Criteri di accesso condiviso nel menu a sinistra.
Selezionare RootManageSharedAccessKey nell'elenco dei criteri.
Selezionare quindi il pulsante Copia accanto a Stringa di connessione - chiave primaria.
Incollare la stringa di connessione in un editor di testo. La stringa sarà necessaria nella sezione seguente.
La stringa di connessione è simile a quanto segue:
Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>
Si noti che il stringa di connessione contiene più coppie chiave-valore separate da punti e virgola: Endpoint, SharedAccessKeyName e SharedAccessKey.
Avviare l'applicazione generatore eventi
Prima di avviare l'app TelcoGenerator, configurarla per inviare i dati all'istanza di Hub eventi di Azure creata in precedenza.
Estrarre il contenuto del file TelcoGenerator.zip.
Aprire il
TelcoGenerator\TelcoGenerator\telcodatagen.exe.config
file in un editor di testo di propria scelta C'è più di un.config
file, quindi assicurarsi di aprire quello corretto.Aggiornare l'elemento
<appSettings>
nel file di configurazione con i dettagli seguenti:- Impostare il valore della chiave EventHubName sul valore di EntityPath alla fine del stringa di connessione.
- Impostare il valore della chiave Microsoft.ServiceBus.ConnectionString sul stringa di connessione sullo spazio dei nomi . Se si usa un stringa di connessione per un hub eventi, non uno spazio dei nomi, rimuovere
EntityPath
il valore (;EntityPath=myeventhub
) alla fine. Non dimenticare di rimuovere il punto e virgola che precede il valore entityPath.
Salvare il file.
Aprire quindi una finestra di comando e passare alla cartella in cui è stata decompressa l'app TelcoGenerator. Immettere quindi il comando seguente:
.\telcodatagen.exe 1000 0.2 2
Questo comando accetta i parametri seguenti:
- Numero di record di dati delle chiamate all'ora.
- Percentuale di probabilità di frode, ovvero frequenza con cui l'app deve simulare una chiamata fraudolenta. Il valore 0,2 indica che circa il 20% dei record di chiamata sembra fraudolento.
- Durata in ore, ovvero numero di ore per cui l'app deve essere eseguita. È anche possibile arrestare l'app in qualsiasi momento terminando il processo (CTRL+C) dalla riga di comando.
Dopo alcuni secondi, l'app inizierà a visualizzare i record delle chiamate mentre ne esegue l'invio all'hub eventi. I dati delle telefonate contengono i campi seguenti:
Record Definizione CallrecTime Timestamp dell'ora di inizio della chiamata. SwitchNum Commutatore telefonico usato per la connessione della chiamata. Per questo esempio i commutatori sono stringhe che rappresentano il paese/area di origine (Stati Uniti, Cina, Regno Unito, Germania o Australia). CallingNum Numero di telefono del chiamante. CallingIMSI Codice IMSI (International Mobile Subscriber Identity). Identificatore univoco del chiamante. CalledNum Numero di telefono del destinatario della chiamata. CalledIMSI Codice IMSI (International Mobile Subscriber Identity). Identificatore univoco del destinatario della chiamata.
Creare un processo di Analisi di flusso.
Dopo aver creato un flusso di eventi di chiamata, è possibile creare un processo di Analisi di flusso che legge i dati dall'hub eventi.
- Per creare un processo di Analisi di flusso, passare al portale di Azure.
- Selezionare Crea una risorsa e cercare Processo di Analisi di flusso. Selezionare il riquadro Processo di Analisi di flusso e quindi Crea.
- Nella pagina Nuovo processo di Analisi di flusso seguire questa procedura:
Per Sottoscrizione selezionare la sottoscrizione che contiene lo spazio dei nomi di Hub eventi.
In Gruppo di risorse selezionare il gruppo di risorse creato in precedenza.
Nella sezione Dettagli istanza immettere un nome univoco per il processo di Analisi di flusso.
In Area selezionare l'area in cui si vuole creare il processo di Analisi di flusso. È consigliabile inserire il processo e l'hub eventi nella stessa area per ottenere prestazioni ottimali e in modo da non pagare per trasferire i dati tra aree.
Per Ambiente di hosting< selezionare Cloud se non è già selezionato. Per la distribuzione dei processi di Analisi di flusso è possibile scegliere tra Cloud o Edge. Il cloud consente di eseguire la distribuzione nel cloud di Azure e Edge consente di eseguire la distribuzione in un dispositivo IoT Edge.
Per Unità di streaming selezionare 1. Le unità di streaming rappresentano le risorse di calcolo necessarie per eseguire un processo. Il valore predefinito di questa impostazione è 1. Per informazioni sul ridimensionamento delle unità di streaming, vedere Informazioni sulle unità di flusso e su come modificarle.
Selezionare Rivedi e crea nella parte inferiore della pagina.
- Nella pagina Rivedi e crea rivedere le impostazioni e quindi selezionare Crea per creare il processo di Analisi di flusso.
- Dopo aver distribuito il processo, selezionare Vai alla risorsa per passare alla pagina del processo di Analisi di flusso.
Configurare l'input del processo
Il passaggio successivo consiste nel definire un'origine di input da cui il processo può leggere i dati usando l'hub eventi creato nella sezione precedente.
Nella pagina processo di Analisi di flusso, nella sezione Topologia processo del menu a sinistra selezionare Input.
Nella pagina Input selezionare + Aggiungi input e Hub eventi.
Nella pagina Hub eventi seguire questa procedura:
Per Alias di input immettere CallStream. L'alias di input è un nome descrittivo per identificare l'input. L'alias di input può contenere solo caratteri alfanumerici, trattini e caratteri di sottolineatura e deve avere una lunghezza compresa tra 3 e 63 caratteri.
Per Sottoscrizione selezionare la sottoscrizione di Azure in cui è stato creato l'hub eventi. L'hub eventi può trovarsi nella stessa sottoscrizione del processo di Analisi di flusso o in una sottoscrizione diversa.
Per Spazio dei nomi di Hub eventi selezionare lo spazio dei nomi di Hub eventi creato nella sezione precedente. Tutti gli spazi dei nomi disponibili nella sottoscrizione corrente sono elencati nell'elenco a discesa.
Per Nome hub eventi selezionare l'hub eventi creato nella sezione precedente. Tutti gli hub eventi disponibili nello spazio dei nomi selezionato sono elencati nell'elenco a discesa.
Per gruppo di consumer dell'hub eventi mantenere selezionata l'opzione Crea nuova in modo che venga creato un nuovo gruppo di consumer nell'hub eventi. È consigliabile usare un gruppo di consumer distinto per ogni processo di Analisi di flusso. Se non viene specificato alcun gruppo di consumer, il processo di Analisi di flusso usa il
$Default
gruppo di consumer. Quando un processo contiene un self-join o ha più input, alcuni input potrebbero essere letti da più di un lettore. Questa situazione influisce sul numero di lettori in un singolo gruppo di consumer.Per Modalità di autenticazione selezionare Stringa di connessione. È più semplice testare l'esercitazione con questa opzione.
Per Nome criterio hub eventi selezionare Usa esistente e quindi selezionare il criterio creato in precedenza.
Selezionare Salva nella parte inferiore della pagina.
Creare un'istanza di Azure Cache per Redis
Creare una cache in Azure Cache per Redis seguendo la procedura descritta in Creare una cache.
Dopo aver creato la cache, in Impostazioniselezionare Chiavi di accesso. Annotare la Stringa di connessione primaria.
Creare una funzione in Funzioni di Azure che possa scrivere dati in Azure Cache per Redis
Vedere la sezione Creare un'app per le funzioni della documentazione relativa a Funzioni. Questo esempio è stato compilato in:
- Impostare la versione del runtime di Funzioni di Azure versione 4
- .NET 6.0
- StackExchange.Redis 2.2.8
Creare un'app per le funzioni HttpTrigger predefinita in Visual Studio Code seguendo questa esercitazione. Vengono usate le informazioni seguenti: linguaggio:
C#
, runtime:.NET 6
(in funzione v4), modello:HTTP trigger
.Installare la libreria client Redis eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:
dotnet add package StackExchange.Redis --version 2.2.88
Aggiungere gli
RedisConnectionString
elementi eRedisDatabaseIndex
nellaValues
sezione dilocal.settings.json
, compilando il stringa di connessione del server di destinazione:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "RedisConnectionString": "Your Redis Connection String", "RedisDatabaseIndex":"0" } }
L'indice del database Redis è il numero compreso tra 0 e 15 che identifica il database nell'istanza di .
Sostituire l'intera funzione (.cs file nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:
using System; using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using StackExchange.Redis; namespace Company.Function { public static class HttpTrigger1{ [FunctionName("HttpTrigger1")] public static async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req, ILogger log) { // Extract the body from the request string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check dynamic data = JsonConvert.DeserializeObject(requestBody); // Reject if too large, as per the doc if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString"); int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex")); using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString)) { // Connection refers to a property that returns a ConnectionMultiplexer IDatabase db = connection.GetDatabase(RedisDatabaseIndex); // Parse items and send to binding for (var i = 0; i < data.Count; i++) { string key = data[i].Time + " - " + data[i].CallingNum1; db.StringSet(key, data[i].ToString()); log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}"); // Simple get of data types from the cache string value = db.StringGet(key); log.LogInformation($"Database got: {key} => {value}"); } } return new OkResult(); // 200 } } }
Quando Analisi di flusso riceve l'eccezione "Entità richiesta HTTP troppo grande" dalla funzione, riduce le dimensioni dei batch inviati alle funzioni. Il codice seguente verifica che Analisi di flusso non invii batch troppo grandi. Assicurarsi che i valori relativi alle dimensioni massime e al numero massimo di batch usati nella funzione siano conformi ai valori inseriti nel portale di Analisi di flusso.
È ora possibile pubblicare la funzione in Azure.
Aprire la funzione nel portale di Azure e impostare le impostazioni dell'applicazione per
RedisConnectionString
eRedisDatabaseIndex
.
Aggiornare il processo di Analisi di flusso con la funzione come output
Nel portale di Azure aprire il processo di Analisi di flusso.
Passare alla funzione e selezionare Panoramica>Output>Aggiungi. Per aggiungere un nuovo output, selezionare Funzione di Azure per l'opzione di sink. L'adattatore di output di Funzioni ha le proprietà seguenti:
Nome proprietà Descrizione Alias di output Nome descrittivo usato nella query del processo per fare riferimento all'output. Opzione di importazione È possibile usare la funzione dalla sottoscrizione corrente oppure specificare manualmente le impostazioni se la funzione si trova in un'altra sottoscrizione. App per le funzioni Nome dell'app Funzioni. Funzione Nome della funzione nell'app Funzioni (nome della funzione di run.csx). Dimensioni massime batch Imposta le dimensioni massime, in byte, per ogni batch di output inviato alla funzione. Per impostazione predefinita, questo valore è impostato su 262.144 byte (256 KB). Numero massimo di batch Specifica il numero massimo di eventi in ogni batch inviato alla funzione. Il valore predefinito è 100. Questa proprietà è facoltativa. Chiave Consente di usare una funzione di un'altra sottoscrizione. Specificare il valore della chiave per accedere alla funzione. Questa proprietà è facoltativa. Specificare un nome per l'alias di output. In questa esercitazione è denominato saop1, ma è possibile usare qualsiasi nome di propria scelta. Specificare gli altri dettagli.
Aprire il processo di Analisi di flusso e aggiornare la query nel modo seguente.
Importante
Lo script di esempio seguente presuppone che sia stato usato CallStream per il nome di input e saop1 per il nome di output. Se sono stati usati nomi diversi, non dimenticare di aggiornare la query.
SELECT System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1, CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2 INTO saop1 FROM CallStream CS1 TIMESTAMP BY CallRecTime JOIN CallStream CS2 TIMESTAMP BY CallRecTime ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5 WHERE CS1.SwitchNum != CS2.SwitchNum
Avviare l'applicazione telcodatagen.exe eseguendo il comando seguente nella riga di comando. Il comando usa il formato
telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]
.telcodatagen.exe 1000 0.2 2
Avviare il processo di Analisi di flusso.
Nella pagina Monitoraggio per la funzione di Azure si noterà che la funzione viene richiamata.
Nella pagina cache di Azure per Redis cache selezionare Metriche nel menu a sinistra, aggiungere la metrica di scrittura della cache e impostare la durata sull'ultima ora. Viene visualizzato il grafico simile all'immagine seguente.
Controllare i risultati in Azure Cache per Redis
Ottenere la chiave dai log di Funzioni di Azure
Ottenere prima di tutto la chiave per un record inserito in cache di Azure per Redis. Nel codice la chiave viene calcolata nella funzione di Azure, come illustrato nel frammento di codice seguente:
string key = data[i].Time + " - " + data[i].CallingNum1;
db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
Passare alla portale di Azure e trovare l'app Funzioni di Azure.
Selezionare Funzioni nel menu sinistro.
Selezionare HTTPTrigger1 nell'elenco delle funzioni.
Selezionare Monitoraggio nel menu a sinistra.
Passare alla scheda Log .
Prendere nota di una chiave dal messaggio informativo, come illustrato nello screenshot seguente. Usare questa chiave per trovare il valore in cache di Azure per Redis.
Usare la chiave per trovare il record in cache di Azure per Redis
Accedere al portale di Azure e individuare Azure Cache per Redis. Selezionare Console.
Usare i comandi di Azure Cache per Redis per verificare che i dati siano in Azure Cache per Redis. Il comando accetta il formato Get {key}.) Usare la chiave copiata dai log di monitoraggio per la funzione di Azure (nella sezione precedente).
Ottenere "KEY-FROM-THE-PREVIOUS-SECTION"
Questo comando dovrebbe visualizzare il valore relativo alla chiave specificata:
Gestione degli errori e tentativi
Se si verifica un errore durante l'invio di eventi a Funzioni di Azure, Analisi di flusso ritenta la maggior parte delle operazioni. Tutte le eccezioni HTTP vengono ritentate fino all'esito positivo, ad eccezione dell'errore HTTP 413 (entità troppo grande). Un errore di tipo entità troppo grande viene considerato un errore di dati soggetto al criterio Riprova o Rimuovi.
Nota
Il timeout per le richieste HTTP da Analisi di flusso a Funzioni di Azure è impostato su 100 secondi. Se l'app Funzioni di Azure richiede più di 100 secondi per elaborare un batch, analisi di flusso si verifica un errore e riprova per il batch.
La ripetizione dei tentativi per i timeout potrebbe comportare eventi duplicati scritti nel sink di output. Quando Analisi di flusso riprova a inviare un batch non riuscito, ripete il tentativo per tutti gli eventi del batch. Si consideri ad esempio un batch di 20 eventi inviati a Funzioni di Azure da Analisi di flusso. Si supponga che Funzioni di Azure impieghi 100 secondi per elaborare i primi 10 eventi del batch. Dopo 100 secondi, Analisi di flusso sospende la richiesta perché non ha ricevuto una risposta positiva da Funzioni di Azure e viene inviata un'altra richiesta per lo stesso batch. I primi 10 eventi del batch vengono elaborati di nuovi da Funzioni di Azure, generando un duplicato.
Problemi noti
Nel portale di Azure, quando si tenta di reimpostare il valore di Dimensioni massime batch/Numero massimo di batch su un valore vuoto (impostazione predefinita), al momento del salvataggio il valore viene reimpostato sul valore immesso in precedenza. In questo caso, immettere manualmente i valori predefiniti per questi campi.
L'uso del routing HTTP in Funzioni di Azure non è attualmente supportato da Analisi di flusso.
Il supporto per la connessione a Funzioni di Azure ospitato in una rete virtuale non è abilitato.
Pulire le risorse
Quando non sono più necessari, eliminare il gruppo di risorse, il processo di streaming e tutte le risorse correlate. Eliminando il processo si evita di pagare per le unità di streaming usate dal processo. Se si prevede di usare il processo in futuro, è possibile arrestarlo e riavviarlo in un secondo momento, quando è necessario. Se non si intende continuare a usare questo processo, eliminare tutte le risorse create da questa guida introduttiva attenendosi alla procedura seguente:
- Scegliere Gruppi di risorse dal menu a sinistra del portale di Azure e quindi selezionare il nome della risorsa creata.
- Nella pagina del gruppo di risorse selezionare Elimina, digitare il nome della risorsa da eliminare nella casella di testo e quindi selezionare Elimina.
Passaggi successivi
In questa esercitazione è stato creato un semplice processo di Analisi di flusso che esegue una funzione di Azure. Per altre informazioni sui processi di Analisi di flusso, continuare con l'esercitazione successiva: