Mulai Cepat: Menerima acara dari Azure Event Hubs menggunakan Apache Storm

Apache Storm adalah sistem komputasi real-time terdistribusi yang menyederhanakan pemrosesan aliran data yang tidak terbatas secara andal. Bagian ini memperlihatkan cara menggunakan cerat Azure Event Hubs Storm untuk menerima acara dari Azure Event Hubs. Menggunakan Apache Storm, Anda dapat membagi peristiwa di beberapa proses yang dihosting di berbagai simpul. Integrasi Azure Event Hubs dengan Storm menyederhanakan konsumsi peristiwa dengan secara transparan menunjukkan kemajuannya menggunakan instalasi Zookeeper Storm, mengelola pos pemeriksaan terus-menerus dan menerima paralel dari Azure Event Hubs.

Untuk informasi selengkapnya tentang pola penerima Azure Event Hubs, lihat Ringkasan Azure Event Hubs.

Prasyarat

Sebelum Anda mulai dengan mulai cepat, buat ruang nama Azure Event Hubs dan hub peristiwa. Gunakan portal Microsoft Azure untuk membuat kumpulan nama jenis Azure Event Hubs, dan dapatkan kredensial manajemen yang diperlukan aplikasi Anda untuk berkomunikasi dengan pusat aktivitas. Untuk membuat kumpulan nama dan pusat aktivitas, ikuti prosedur di artikel ini.

Membuat proyek dan menambahkan kode

  1. Gunakan perintah berikut untuk menginstal paket ke penyimpanan Maven lokal. Ini memungkinkan Anda untuk menambahkannya sebagai referensi dalam proyek Storm di langkah selanjutnya.

    mvn install:install-file -Dfile=target\eventhubs-storm-spout-0.9-jar-with-dependencies.jar -DgroupId=com.microsoft.eventhubs -DartifactId=eventhubs-storm-spout -Dversion=0.9 -Dpackaging=jar
    
  2. Di Eclipse, buat proyek Maven baru (klik File, lalu Baru, lalu Proyek).

    File -> Baru -> Proyek

  3. Pilih Gunakan lokasi Ruang Kerja default, lalu klik Berikutnya

  4. Pilih arketipe maven-archetype-quickstart, lalu klik Berikutnya

  5. Sisipkan GroupIddan Artefak, lalu klik Selesai

  6. Di pom.xml, tambahkan dependensi berikut dalam simpul <dependency>.

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2-incubating</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.microsoft.eventhubs</groupId>
        <artifactId>eventhubs-storm-spout</artifactId>
        <version>0.9</version>
    </dependency>
    <dependency>
        <groupId>com.netflix.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>1.3.3</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
        <scope>provided</scope>
    </dependency>
    
  7. Di folder src, buat file bernama Config.properties dan salin konten berikut ini, menggantikan nilai receive rule key dan event hub name:

    eventhubspout.username = ReceiveRule
    eventhubspout.password = {receive rule key}
    eventhubspout.namespace = ioteventhub-ns
    eventhubspout.entitypath = {event hub name}
    eventhubspout.partitions.count = 16
    
    # if not provided, will use storm's zookeeper settings
    # zookeeper.connectionstring=localhost:2181
    
    eventhubspout.checkpoint.interval = 10
    eventhub.receiver.credits = 10
    

    Nilai untuk eventhub.receiver.credits menentukan berapa banyak peristiwa yang di batch sebelum melepaskannya ke alur Storm. Demi kesederhanaan, contoh ini menetapkan nilai ini menjadi 10. Dalam produksi, biasanya harus diatur ke nilai yang lebih tinggi; misalnya, 1024. 1 . Buat kelas baru bernama LoggerBolt dengan kode berikut:

    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    public class LoggerBolt extends BaseRichBolt {
        private OutputCollector collector;
        private static final Logger logger = LoggerFactory
                  .getLogger(LoggerBolt.class);
    
        @Override
        public void execute(Tuple tuple) {
            String value = tuple.getString(0);
            logger.info("Tuple value: " + value);
    
            collector.ack(tuple);
        }
    
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            this.count = 0;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // no output fields
        }
    
    }
    

    Baut Storm ini mencatat isi dari peristiwa yang diterima. Ini dapat dengan mudah diperluas untuk menyimpan tuple dalam layanan penyimpanan. Contoh HDInsight Storm dengan contoh Event Hub menggunakan pendekatan yang sama ini untuk menyimpan data ke dalam Azure Storage dan Power BI.

  8. Buat kelas yang disebut LogTopology dengan kode berikut:

    import java.io.FileReader;
    import java.util.Properties;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;
    import com.microsoft.eventhubs.samples.EventCount;
    import com.microsoft.eventhubs.spout.EventHubSpout;
    import com.microsoft.eventhubs.spout.EventHubSpoutConfig;
    
    public class LogTopology {
        protected EventHubSpoutConfig spoutConfig;
        protected int numWorkers;
    
        protected void readEHConfig(String[] args) throws Exception {
            Properties properties = new Properties();
            if (args.length > 1) {
                properties.load(new FileReader(args[1]));
            } else {
                properties.load(EventCount.class.getClassLoader()
                        .getResourceAsStream("Config.properties"));
            }
    
            String username = properties.getProperty("eventhubspout.username");
            String password = properties.getProperty("eventhubspout.password");
            String namespaceName = properties
                    .getProperty("eventhubspout.namespace");
            String entityPath = properties.getProperty("eventhubspout.entitypath");
            String zkEndpointAddress = properties
                    .getProperty("zookeeper.connectionstring"); // opt
            int partitionCount = Integer.parseInt(properties
                    .getProperty("eventhubspout.partitions.count"));
            int checkpointIntervalInSeconds = Integer.parseInt(properties
                    .getProperty("eventhubspout.checkpoint.interval"));
            int receiverCredits = Integer.parseInt(properties
                    .getProperty("eventhub.receiver.credits")); // prefetch count
                                                                // (opt)
            System.out.println("Eventhub spout config: ");
            System.out.println("  partition count: " + partitionCount);
            System.out.println("  checkpoint interval: "
                    + checkpointIntervalInSeconds);
            System.out.println("  receiver credits: " + receiverCredits);
    
            spoutConfig = new EventHubSpoutConfig(username, password,
                    namespaceName, entityPath, partitionCount, zkEndpointAddress,
                    checkpointIntervalInSeconds, receiverCredits);
    
            // set the number of workers to be the same as partition number.
            // the idea is to have a spout and a logger bolt co-exist in one
            // worker to avoid shuffling messages across workers in storm cluster.
            numWorkers = spoutConfig.getPartitionCount();
    
            if (args.length > 0) {
                // set topology name so that sample Trident topology can use it as
                // stream name.
                spoutConfig.setTopologyName(args[0]);
            }
        }
    
        protected StormTopology buildTopology() {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
    
            EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
            topologyBuilder.setSpout("EventHubsSpout", eventHubSpout,
                    spoutConfig.getPartitionCount()).setNumTasks(
                    spoutConfig.getPartitionCount());
            topologyBuilder
                    .setBolt("LoggerBolt", new LoggerBolt(),
                            spoutConfig.getPartitionCount())
                    .localOrShuffleGrouping("EventHubsSpout")
                    .setNumTasks(spoutConfig.getPartitionCount());
            return topologyBuilder.createTopology();
        }
    
        protected void runScenario(String[] args) throws Exception {
            boolean runLocal = true;
            readEHConfig(args);
            StormTopology topology = buildTopology();
            Config config = new Config();
            config.setDebug(false);
    
            if (runLocal) {
                config.setMaxTaskParallelism(2);
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("test", config, topology);
                Thread.sleep(5000000);
                localCluster.shutdown();
            } else {
                config.setNumWorkers(numWorkers);
                StormSubmitter.submitTopology(args[0], config, topology);
            }
        }
    
        public static void main(String[] args) throws Exception {
            LogTopology topology = new LogTopology();
            topology.runScenario(args);
        }
    }
    

    Kelas ini membuat cerat Azure Event Hubs baru, menggunakan properti dalam file konfigurasi untuk membuatnya secara instan. Penting untuk dicatat bahwa contoh ini membuat tugas cerat sebanyak jumlah partisi di hub acara, untuk menggunakan paralelisme maksimum yang diizinkan oleh hub acara itu.

Langkah berikutnya

Anda dapat mempelajari selengkapnya tentang Azure Event Hubs dengan mengunjungi tautan berikut: