Udostępnij za pośrednictwem


Zapisywanie komunikatów zdarzeń w usłudze Azure Data Lake Storage Gen2 przy użyciu interfejsu API apache Flink® DataStream

Uwaga

Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.

Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.

Ważne

Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.

Narzędzie Apache Flink używa systemów plików do korzystania z danych i ich trwałego przechowywania, zarówno w przypadku wyników aplikacji, jak i odporności na uszkodzenia i odzyskiwania. Z tego artykułu dowiesz się, jak zapisywać komunikaty o zdarzeniach w usłudze Azure Data Lake Storage Gen2 przy użyciu interfejsu API datastream.

Wymagania wstępne

Ten łącznik systemu plików zapewnia te same gwarancje zarówno dla usługi BATCH, jak i PRZESYŁANIA STRUMIENIOWEgo, i jest przeznaczony do zapewnienia dokładnie raz semantyki na potrzeby wykonywania przesyłania strumieniowego. Aby uzyskać więcej informacji, zobacz Flink DataStream Filesystem.

Łącznik platformy Apache Kafka

Flink udostępnia łącznik platformy Apache Kafka do odczytywania danych z tematów platformy Kafka i zapisywania ich w tematach platformy Kafka z dokładnie jednokrotnymi gwarancjami. Aby uzyskać więcej informacji, zobacz Apache Kafka Connector (Łącznik platformy Apache Kafka).

pom.xml w środowisku IntelliJ IDEA

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Program dla ujścia usługi ADLS Gen2

abfsGen2.java

Uwaga

Zastąp platformę Apache Kafka w klastrze usługi HDInsight bootStrapServers własnymi brokerami dla platformy Kafka 3.2

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class KafkaSinkToGen2 {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         
        Configuration flinkConfig = new Configuration(); 

         flinkConfig.setString("classloader.resolve-order", "parent-first"); 

         env.getConfig().setGlobalJobParameters(flinkConfig);  

        // 2. read kafka message as stream input, update your broker ip's
        String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("click_events")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.print();

        // 3. sink to gen2, update container name and storage path
        String outputPath  = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        stream.sinkTo(sink);

        // 4. run stream
        env.execute("Kafka Sink To Gen2");
    }
}

Spakuj plik jar i prześlij go do narzędzia Apache Flink.

  1. Przekaż plik jar do systemu ABFS.

    Zrzut ekranu przedstawiający ekran trybu aplikacji Flink.

  2. Przekaż informacje o pliku jar zadania podczas AppMode tworzenia klastra.

    Zrzut ekranu przedstawiający tryb tworzenia aplikacji.

    Uwaga

    Pamiętaj, aby dodać klasloader.resolve-order jako "parent-first" i hadoop.classpath.enable jako true

  3. Wybierz pozycję Agregacja dziennika zadań, aby wypychać dzienniki zadań do konta magazynu.

    Zrzut ekranu przedstawiający sposób włączania dziennika zadań.

  4. Możesz zobaczyć uruchomione zadanie.

    Zrzut ekranu przedstawiający interfejs użytkownika Flink.

Weryfikowanie danych przesyłanych strumieniowo w usłudze ADLS Gen2

Widzimy click_events przesyłanie strumieniowe do usługi ADLS Gen2.

Zrzut ekranu przedstawiający dane wyjściowe usługi ADLS Gen2.Zrzut ekranu przedstawiający dane wyjściowe zdarzenia kliknięcia przycisku Flink.

Możesz określić zasady stopniowe, które rzutuje plik części w toku w dowolnym z następujących trzech warunków:

.withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(5))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())

Odwołanie