연습 - 이벤트를 처리하고 데이터를 Azure Cosmos DB에 저장

완료됨

두 번째 함수는 Azure Event Hub에서 특정 네임스페이스의 이벤트를 수신 대기하고 Azure Cosmos DB로 만든 데이터베이스에서 처리하고 저장할 수 있습니다.

Azure Cosmos DB로 데이터베이스 만들기

데이터베이스를 만들려면 az cosmosdb create 명령을 사용합니다. 이 명령은 Azure Cosmos DB 계정, 데이터베이스 및 SQL 컨테이너를 사용합니다.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

이 시나리오에서는 온도가 흥미롭습니다. 파티션 키로 temperatureStatus를 정의합니다.

또 다른 Azure 함수 작성, 구성, 배포

이벤트 허브를 사용하여 메가바이트 단위로 데이터 스트림을 시작하여 기가바이트 또는 테라바이트 단위로 확장할 수 있습니다. 자동 인플레이트 기능은 사용량 요구 사항에 맞게 처리량 단위 수를 조정하는 데 사용할 수 있는 여러 옵션 중 하나입니다.

각 함수에 사용되는 애플리케이션에는 이벤트 스트림에 대한 별도의 보기가 있습니다. 소비자는 자신의 속도로 자신의 오프셋을 통해 독립적으로 스트림을 읽습니다.

이 시나리오에서는 Azure 함수를 사용하는 함수를 한 가지 예로 만듭니다. 함수를 만들려면 모범 사례에 따라 자체 스토리지 계정 및 느슨한 결합 및 확장성을 위한 바인딩을 사용하여 독립적이어야 합니다.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

연결 문자열 검색

소비자 함수는 스토리지 계정과 이벤트 허브를 인식해야 합니다. 또한 처리된 이벤트를 작성하는 데이터베이스를 알고 있어야 합니다.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

이 명령을 echo $EVENT_HUB_CONNECTION_STRING 사용하여 변수가 여전히 올바르게 설정되어 있는지 검사 수 있습니다. 그렇지 않으면 다음 명령을 다시 실행합니다.

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

이러한 연결 문자열을 Azure Functions 계정의 애플리케이션 설정에 저장해야 합니다.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

참고

프로덕션 환경에서는 Azure Key Vault 인스턴스를 사용하여 연결 문자열을 저장하고 관리할 수 있습니다.

함수 애플리케이션 만들기

다음 함수를 만들기 전에 올바른 폴더에 있는지 확인합니다.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

이 명령은 마지막 연습과 같은 애플리케이션을 만듭니다. 테스트 파일을 삭제하고 명령으로 업데이트 local.settings.filefetch-app-settings 다음 기존 Function.java 파일을 바꿉다.

cd telemetry-functions-consumer
rm -r src/test

로컬 실행 및 디버깅에 대한 로컬 설정을 업데이트합니다.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

다음으로, Function.java 파일을 열고 내용을 다음 코드로 바꿉니다.

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

TelemetryItem.java라는 또 다른 새 파일을 Function.java와 같은 위치에 만들고 다음 코드를 추가합니다.

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

이벤트 허브에서 메시지를 받으면 이벤트가 생성됩니다. processSensorData 함수는 이벤트를 받으면 실행됩니다. 그런 다음 이벤트 데이터를 처리하고 Azure Cosmos DB의 출력 바인딩을 사용하여 결과를 데이터베이스로 보냅니다. 클래스를 TelemetryItem.java 다시 사용합니다. TelemetryItem 개체는 이 이벤트 구동 시스템의 참가자 간에 소비자 주도 계약으로 표시될 수 있습니다.

로컬 실행

Azure Functions를 사용하면 전 세계에서 이벤트를 받을 수 있습니다. 예, 개발 머신에서 로컬로 이벤트를 받을 수도 있습니다.

mvn clean package
mvn azure-functions:run

빌드 및 시작 메시지 후에 함수가 실행되면 들어오는 이벤트가 표시됩니다.

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

Azure Portal에서 Azure Cosmos DB 계정으로 이동합니다. 데이터 탐색기를 선택하고, TelemetryInfo를 선택한 다음, 항목을 선택하여 데이터가 도착하면 이를 확인합니다.

Screenshot that shows TelemetryInfo in Azure Cosmos DB Data Explorer.

Azure에 배포

이제 클라우드에서 전체 워크로드를 이동하겠습니다. 함수를 Azure Functions에 배포하려면 Maven 명령을 mvn azure-functions:deploy사용합니다. 올바른 리포지토리 telemetry-functions를 사용하고 있는지 확인합니다.

mvn azure-functions:deploy

좋습니다. 데이터를 이벤트 허브로 보내고 다른 독립 함수로 데이터를 소비하여 전체 원격 분석 시나리오를 배포했습니다. 함수는 데이터를 처리한 후 Azure Cosmos DB로 만든 데이터베이스에 결과를 저장합니다. 애플리케이션에서 미리 정의된 요구 사항을 충족하고 있는지 확인하려면 어떻게 해야 하나요? 모니터링을 사용하여