Oefening: de gebeurtenissen verwerken en de gegevens opslaan in Azure Cosmos DB
Een tweede functie kan luisteren naar gebeurtenissen van de specifieke naamruimte in de Azure Event Hub en deze verwerken en opslaan in een database die is gemaakt met Azure Cosmos DB.
Een database maken met Azure Cosmos DB
Gebruik de opdracht az cosmosdb create om de database te maken. De opdracht maakt gebruik van een Azure Cosmos DB-account, een database en een SQL-container.
az cosmosdb create \
--resource-group $RESOURCE_GROUP \
--name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
--resource-group $RESOURCE_GROUP \
--account-name $COSMOS_DB_ACCOUNT \
--name TelemetryDb
az cosmosdb sql container create \
--resource-group $RESOURCE_GROUP \
--account-name $COSMOS_DB_ACCOUNT \
--database-name TelemetryDb \
--name TelemetryInfo \
--partition-key-path '/temperatureStatus'
Voor ons scenario is de temperatuur interessant. Daarom definiëren we temperatureStatus als de partitiesleutel.
Een andere Azure-functie bouwen, configureren en implementeren
Met Event Hubs kunt u starten met gegevensstromen in megabytes en opschalen naar gigabytes of terabytes. De functie voor automatisch vergroten is een van de vele opties die beschikbaar zijn om het aantal doorvoereenheden te schalen om te voldoen aan uw gebruiksbehoeften.
De applicaties die gebruikmaken van elke functie hebben een eigen weergave van de reeks gebeurtenissen. Ze lezen de stroom onafhankelijk in hun eigen tempo en met hun eigen verschuivingen.
Voor ons scenario maakt u één verbruikende Azure-functie als voorbeeld. Als u de functie wilt maken volgens de aanbevolen procedures, moet deze onafhankelijk zijn, met een eigen opslagaccount en bindingen voor losse koppeling en schaalbaarheid.
az storage account create \
--resource-group $RESOURCE_GROUP \
--name $STORAGE_ACCOUNT"c" \
--sku Standard_LRS
az functionapp create \
--resource-group $RESOURCE_GROUP \
--name $FUNCTION_APP"-c"\
--storage-account $STORAGE_ACCOUNT"c" \
--consumption-plan-location $LOCATION \
--runtime java \
--functions-version 4
De verbindingsreeksen ophalen
De consumentenfunctie moet rekening houden met het opslagaccount en de Event Hub. Het moet ook rekening houden met de database waarnaar de verwerkte gebeurtenissen worden geschreven.
AZURE_WEB_JOBS_STORAGE=$( \
az storage account show-connection-string \
--resource-group $RESOURCE_GROUP \
--name $STORAGE_ACCOUNT"c" \
--query connectionString \
--output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
az cosmosdb keys list \
--resource-group $RESOURCE_GROUP \
--name $COSMOS_DB_ACCOUNT \
--type connection-strings \
--query 'connectionStrings[0].connectionString' \
--output tsv)
echo $COSMOS_DB_CONNECTION_STRING
U kunt de opdracht echo $EVENT_HUB_CONNECTION_STRING gebruiken om te controleren of de variabele nog steeds juist is ingesteld. Voer anders de volgende opdracht opnieuw uit:
EVENT_HUB_CONNECTION_STRING=$( \
az eventhubs eventhub authorization-rule keys list \
--resource-group $RESOURCE_GROUP \
--name $EVENT_HUB_AUTHORIZATION_RULE \
--eventhub-name $EVENT_HUB_NAME \
--namespace-name $EVENT_HUB_NAMESPACE \
--query primaryConnectionString \
--output tsv)
echo $EVENT_HUB_CONNECTION_STRING
Deze verbindingsreeksen moeten worden opgeslagen in de toepassingsinstellingen voor uw Azure Functions-account.
az functionapp config appsettings set \
--resource-group $RESOURCE_GROUP \
--name $FUNCTION_APP"-c" \
--settings \
AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING
Notitie
Voor productieomgevingen kunt u een exemplaar van Azure Key Vault gebruiken om de verbindingsreeksen op te slaan en te beheren.
De functietoepassing maken
Voordat u de volgende functie maakt, moet u ervoor zorgen dat u zich in de juiste map bevindt.
cd ..
mvn archetype:generate --batch-mode \
-DarchetypeGroupId=com.microsoft.azure \
-DarchetypeArtifactId=azure-functions-archetype \
-DappName=$FUNCTION_APP"-c" \
-DresourceGroup=$RESOURCE_GROUP \
-DappRegion=$LOCATION \
-DappServicePlanName=$LOCATION"plan" \
-DgroupId=com.learn \
-DartifactId=telemetry-functions-consumer
Met de opdracht maakt u een toepassing zoals in de laatste oefening. U verwijdert de testbestanden, werkt de local.settings.file bij met de opdracht fetch-app-settings en vervangt vervolgens het bestaande Function.java-bestand.
cd telemetry-functions-consumer
rm -r src/test
Werk de lokale instellingen voor lokale uitvoering en foutopsporing bij.
func azure functionapp fetch-app-settings $FUNCTION_APP"-c"
Open vervolgens het bestand Function.java en vervang de inhoud door de volgende code:
package com.learn;
import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;
public class Function {
@FunctionName("processSensorData")
public void processSensorData(
@EventHubTrigger(
name = "msg",
eventHubName = "", // blank because the value is included in the connection string
cardinality = Cardinality.ONE,
connection = "EventHubConnectionString")
TelemetryItem item,
@CosmosDBOutput(
name = "databaseOutput",
databaseName = "TelemetryDb",
collectionName = "TelemetryInfo",
connectionStringSetting = "CosmosDBConnectionString")
OutputBinding<TelemetryItem> document,
final ExecutionContext context) {
context.getLogger().info("Event hub message received: " + item.toString());
if (item.getPressure() > 30) {
item.setNormalPressure(false);
} else {
item.setNormalPressure(true);
}
if (item.getTemperature() < 40) {
item.setTemperatureStatus(status.COOL);
} else if (item.getTemperature() > 90) {
item.setTemperatureStatus(status.HOT);
} else {
item.setTemperatureStatus(status.WARM);
}
document.setValue(item);
}
}
Maak een nieuw bestand met de naam TelemetryItem.java op dezelfde locatie als Function.java en voeg de volgende code toe:
package com.learn;
public class TelemetryItem {
private String id;
private double temperature;
private double pressure;
private boolean isNormalPressure;
private status temperatureStatus;
static enum status {
COOL,
WARM,
HOT
}
public TelemetryItem(double temperature, double pressure) {
this.temperature = temperature;
this.pressure = pressure;
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public double getPressure() {
return pressure;
}
@Override
public String toString() {
return "TelemetryItem={id=" + id + ",temperature="
+ temperature + ",pressure=" + pressure + "}";
}
public boolean isNormalPressure() {
return isNormalPressure;
}
public void setNormalPressure(boolean isNormal) {
this.isNormalPressure = isNormal;
}
public status getTemperatureStatus() {
return temperatureStatus;
}
public void setTemperatureStatus(status temperatureStatus) {
this.temperatureStatus = temperatureStatus;
}
}
Wanneer de Event Hub het bericht ontvangt, wordt er een gebeurtenis gegenereerd. De processSensorData-functie wordt uitgevoerd wanneer deze de gebeurtenis ontvangt. Vervolgens worden de gebeurtenisgegevens verwerkt en wordt een uitvoerbinding van Azure Cosmos DB gebruikt om de resultaten naar de database te verzenden. We gebruiken de TelemetryItem.java klasse opnieuw. De TelemetryItem objecten kunnen worden gezien als het consumentgestuurde contract tussen de deelnemers aan dit gebeurtenisgestuurde systeem.
Lokaal uitvoeren
Met Azure Functions kunt u overal ter wereld gebeurtenissen ontvangen. Ja, u kunt zelfs gebeurtenissen lokaal ontvangen op uw ontwikkelcomputer.
mvn clean package
mvn azure-functions:run
Na de build- en opstartberichten ziet u de binnenkomende gebeurtenissen wanneer de functie wordt uitgevoerd:
[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker
Ga in Azure Portal naar uw Azure Cosmos DB-account. Selecteer Data Explorer-, selecteer TelemetryInfoen selecteer vervolgens Items om uw gegevens weer te geven wanneer deze binnenkomen.
Implementeren in Azure
Nu gaan we de hele workload in de cloud verschuiven. Als u de functies wilt implementeren in Azure Functions, gebruikt u de Maven-opdracht mvn azure-functions:deploy. Zorg ervoor dat u zich nog steeds in de juiste opslagplaats bevindt, telemetrie-functies.
mvn azure-functions:deploy
Wonderbaar! We hebben het hele telemetriescenario geïmplementeerd door de gegevens naar een Event Hub te verzenden en de gegevens met een andere onafhankelijke functie te gebruiken. De functie verwerkt de gegevens en slaat vervolgens het resultaat op in een database die is gemaakt met Azure Cosmos DB. Hoe kunnen we ervoor zorgen dat onze toepassing voldoet aan onze vooraf gedefinieerde vereisten? Door bewaking te gebruiken.