Elaborare dati IoT in tempo reale in Apache Flink® con Azure HDInsight nel servizio Azure Kubernetes
L'hub IoT di Azure è un servizio gestito, ospitato nel cloud, che funge da hub centrale di messaggi per le comunicazioni bidirezionali tra l'applicazione di IoT e i relativi dispositivi collegati. È possibile connettere milioni di dispositivi e le relative soluzioni back-end in modo affidabile e sicuro. Quasi tutti i dispositivi possono essere connessi a un hub IoT.
In questo esempio il codice elabora i dati IoT in tempo reale in Apache Flink® con Azure HDInsight nel servizio Azure Kubernetes e sink nell'archiviazione ADLS Gen2.
Prerequisiti
- Creare un'istanza di Azure IoTHub
- Creare un cluster Flink 1.17.0 in HDInsight nel servizio Azure Kubernetes
- Usare MSI per accedere ad ADLS Gen2
- IntelliJ per lo sviluppo
Nota
Per questa dimostrazione si usa una macchina virtuale Window come progetto Maven sviluppare env nella stessa rete virtuale di HDInsight nel servizio Azure Kubernetes.
Cluster Flink 1.17.0 in HDInsight nel servizio Azure Kubernetes
Hub IOT di Azure in portale di Azure
All'interno del stringa di connessione è possibile trovare un URL del bus di servizio (URL dello spazio dei nomi dell'hub eventi sottostante), che è necessario aggiungere come server bootstrap nell'origine Kafka. In questo esempio è iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093
.
Preparare il messaggio nel dispositivo IOT di Azure
Ogni hub IoT include endpoint di sistema predefiniti per gestire i messaggi di sistema e dei dispositivi.
Per altre informazioni, vedere Come usare VS Code come simulatore di dispositivi hub IoT.
Codice in Flink
IOTdemo.java
KafkaSource: IoTHub si basa sull'hub eventi e supporta quindi un'API simile a kafka. Nel processo Flink è quindi possibile definire un Oggetto KafkaSource con parametri appropriati per l'utilizzo dei messaggi da IoTHub.
FileSink: definire il sink ABFS.
package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.time.Duration;
public class IOTdemo {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
String connectionString = "<your iot hub connection string>";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("<your iot hub's service bus url>:9093")
.setTopics("<name of your iot hub>")
.setGroupId("$Default")
.setProperty("partition.discovery.interval.ms", "10000")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
String outputPath = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
kafka.sinkTo(sink);
env.execute("Sink Azure IOT hub to ADLS gen2");
}
}
Maven pom.xml
<groupId>contoso.example</groupId>
<artifactId>FlinkIOTDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Creare il pacchetto con estensione jar e inviare il processo nel cluster Flink
Caricare il file JAR nel pod Webssh e inviare il file JAR.
user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858
Controllare il processo nell'interfaccia utente del dashboard Flink
Controllare il risultato in ADLS Gen2 in portale di Azure
Riferimento
- Sito Web Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi di Apache Software Foundation (ASF).