Écrire des messages d’événement dans Azure Data Lake Storage Gen2 avec l’API DataStream Apache Flink®
Remarque
Nous allons mettre hors service Azure HDInsight sur AKS le 31 janvier 2025. Avant le 31 janvier 2025, vous devrez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent afin d’éviter leur arrêt brutal. Les clusters restants de votre abonnement seront arrêtés et supprimés de l’hôte.
Seul le support de base sera disponible jusqu’à la date de mise hors service.
Important
Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Pour toute question ou tout envoi de suggestions sur la fonctionnalité, veuillez soumettre une requête sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur la Communauté Azure HDInsight.
Apache Flink utilise des systèmes de fichiers pour consommer et stocker de manière persistante des données, à la fois pour les résultats des applications et pour la tolérance aux pannes et la récupération. Dans cet article, découvrez comment écrire des messages d'événement dans Azure Data Lake Storage Gen2 avec l'API DataStream.
Prérequis
- Cluster Apache Flink sur HDInsight sur AKS
- Cluster Apache Kafka sur HDInsight
- Vous devez vérifier que les paramètres réseau ont été pris en charge comme décrit dans Utilisation d’Apache Kafka sur HDInsight. Vérifiez que HDInsight sur les clusters AKS et HDInsight se trouvent dans le même réseau virtuel.
- Utiliser MSI pour accéder à ADLS Gen2
- IntelliJ pour le développement sur une machine virtuelle Azure dans HDInsight sur le réseau virtuel AKS
Connecteur Apache Flink FileSystem
Ce connecteur de système de fichiers offre les mêmes garanties pour BATCH et STREAMING et est conçu pour fournir exactement une sémantique unique pour l'exécution de STREAMING. Pour plus d’informations, consultez Système de fichiers Flink DataStream.
Connecteur Apache Kafka
Flink fournit un connecteur Apache Kafka pour lire et écrire des données dans des sujets Kafka avec des garanties uniques. Pour plus d’informations, consultez Connecteur Apache Kafka.
Construire le projet pour Apache Flink
pom.xml sur IntelliJ IDEA
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Programme pour l'évier ADLS Gen2
abfsGen2.java
Remarque
Remplacez les bootStrapServers Cluster Apache Kafka sur HdInsight par vos propres répartiteurs pour Kafka 3.2
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class KafkaSinkToGen2 {
public static void main(String[] args) throws Exception {
// 1. get stream execution env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration flinkConfig = new Configuration();
flinkConfig.setString("classloader.resolve-order", "parent-first");
env.getConfig().setGlobalJobParameters(flinkConfig);
// 2. read kafka message as stream input, update your broker ip's
String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("click_events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.print();
// 3. sink to gen2, update container name and storage path
String outputPath = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
stream.sinkTo(sink);
// 4. run stream
env.execute("Kafka Sink To Gen2");
}
}
Empaquetez le fichier jar, puis envoyez-le à Apache Flink.
Chargez le fichier jar dans ABFS.
Transmettez les informations jar du travail lors de la création de cluster
AppMode
.Remarque
Veillez à ajouter classloader.resolve-order comme « parent-first » et hadoop.classpath.enable comme
true
Sélectionnez l’agrégation du journal des travaux pour envoyer (push) les journaux des travaux au compte de stockage.
Le travail en cours d’exécution apparaît.
Valider les données de streaming sur ADLS Gen2
Vous assistez à la diffusion en continu click_events
dans ADLS Gen2.
Vous pouvez spécifier une stratégie de roulement qui lance le fichier pièce en cours dans l'une des trois conditions suivantes :
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
Référence
- Connecteur Apache Kafka
- Système de fichiers Flink DataStream
- Site web Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink et les noms de projet open source associés sont des marques de commerce d’Apache Software Foundation (ASF).