Partage via


Écrire des messages d’événement dans Azure Data Lake Storage Gen2 avec l’API DataStream Apache Flink®

Important

Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Pour toute question ou tout envoi de suggestions sur la fonctionnalité, veuillez soumettre une requête sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur la Communauté Azure HDInsight.

Apache Flink utilise des systèmes de fichiers pour consommer et stocker de manière persistante des données, à la fois pour les résultats des applications et pour la tolérance aux pannes et la récupération. Dans cet article, découvrez comment écrire des messages d'événement dans Azure Data Lake Storage Gen2 avec l'API DataStream.

Prérequis

Ce connecteur de système de fichiers offre les mêmes garanties pour BATCH et STREAMING et est conçu pour fournir exactement une sémantique unique pour l'exécution de STREAMING. Pour plus d’informations, consultez Système de fichiers Flink DataStream.

Connecteur Apache Kafka

Flink fournit un connecteur Apache Kafka pour lire et écrire des données dans des sujets Kafka avec des garanties uniques. Pour plus d’informations, consultez Connecteur Apache Kafka.

pom.xml sur IntelliJ IDEA

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Programme pour l'évier ADLS Gen2

abfsGen2.java

Remarque

Remplacez les bootStrapServers Cluster Apache Kafka sur HdInsight par vos propres répartiteurs pour Kafka 3.2

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class KafkaSinkToGen2 {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         
        Configuration flinkConfig = new Configuration(); 

         flinkConfig.setString("classloader.resolve-order", "parent-first"); 

         env.getConfig().setGlobalJobParameters(flinkConfig);  

        // 2. read kafka message as stream input, update your broker ip's
        String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("click_events")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.print();

        // 3. sink to gen2, update container name and storage path
        String outputPath  = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        stream.sinkTo(sink);

        // 4. run stream
        env.execute("Kafka Sink To Gen2");
    }
}

Empaquetez le fichier jar, puis envoyez-le à Apache Flink.

  1. Chargez le fichier jar dans ABFS.

    Capture d’écran montrant l’écran en mode d’application Flink.

  2. Transmettez les informations jar du travail lors de la création de cluster AppMode.

    Capture d'écran montrant le mode de création d’une application.

    Remarque

    Veillez à ajouter classloader.resolve-order comme « parent-first » et hadoop.classpath.enable comme true

  3. Sélectionnez l’agrégation du journal des travaux pour envoyer (push) les journaux des travaux au compte de stockage.

    Capture d’écran montrant comment activer le journal des travaux.

  4. Le travail en cours d’exécution apparaît.

    Capture d’écran montrant l’interface utilisateur Flink.

Valider les données de streaming sur ADLS Gen2

Vous assistez à la diffusion en continu click_events dans ADLS Gen2.

Capture d’écran montrant la sortie ADLS Gen2.Capture d’écran montrant la sortie de l’événement Flink click.

Vous pouvez spécifier une stratégie de roulement qui lance le fichier pièce en cours dans l'une des trois conditions suivantes :

.withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(5))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())

Référence