Rövid útmutató: Események fogadása az Event Hubsból az Apache Storm használatával

Az Apache Storm egy elosztott, valós idejű számítási rendszer, amely leegyszerűsíti a kötetlen adatstreamek megbízható feldolgozását. Ez a szakasz bemutatja, hogyan fogadhatja az eseményeket az Event Hubsból egy Azure Event Hubs Storm-spout használatával. Az Apache Storm használatával több különböző csomóponton üzemeltetett folyamatra oszthatja fel az eseményeket. Az Event Hubs És a Storm integrációja leegyszerűsíti az eseményfelhasználást azáltal, hogy átláthatóan ellenőrzi annak előrehaladását a Storm Zookeeper telepítésével, az állandó ellenőrzőpontok kezelésével és az Event Hubstól érkező párhuzamos fogadásokkal.

Az Eseményközpontok fogadási mintáival kapcsolatos további információkért tekintse meg az Event Hubs áttekintését.

Előfeltételek

Mielőtt elkezdené a rövid útmutatót, hozzon létre egy Event Hubs-névteret és egy eseményközpontot. Az Azure Portal használatával hozzon létre egy Event Hubs típusú névteret, és szerezze be az alkalmazásnak az eseményközponttal való kommunikációhoz szükséges felügyeleti hitelesítő adatokat. Névtér és eseményközpont létrehozásához kövesse az ebben a cikkben ismertetett eljárást.

Projekt létrehozása és kód hozzáadása

  1. Az alábbi paranccsal telepítse a csomagot a helyi Maven-tárolóba. Ez lehetővé teszi, hogy egy későbbi lépésben hivatkozásként adja hozzá a Storm-projekthez.

    mvn install:install-file -Dfile=target\eventhubs-storm-spout-0.9-jar-with-dependencies.jar -DgroupId=com.microsoft.eventhubs -DartifactId=eventhubs-storm-spout -Dversion=0.9 -Dpackaging=jar
    
  2. Az Eclipse-ben hozzon létre egy új Maven-projektet (kattintson a Fájl, majd az Új, majd a Project elemre).

    Fájl –> Új –> Projekt

  3. Válassza a Munkaterület alapértelmezett helyének használata lehetőséget, majd kattintson a Tovább gombra.

  4. Válassza ki a maven-archetype-quickstart archetype elemet, majd kattintson a Tovább gombra.

  5. GroupId és ArtifactId beszúrása, majd kattintson a Befejezés gombra

  6. A pom.xmladja hozzá a következő függőségeket a <dependency> csomóponthoz.

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2-incubating</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.microsoft.eventhubs</groupId>
        <artifactId>eventhubs-storm-spout</artifactId>
        <version>0.9</version>
    </dependency>
    <dependency>
        <groupId>com.netflix.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>1.3.3</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
        <scope>provided</scope>
    </dependency>
    
  7. Az src mappában hozzon létre egy Config.properties nevű fájlt, és másolja ki a következő tartalmat, a és event hub name az receive rule key értékek helyettesítésével:

    eventhubspout.username = ReceiveRule
    eventhubspout.password = {receive rule key}
    eventhubspout.namespace = ioteventhub-ns
    eventhubspout.entitypath = {event hub name}
    eventhubspout.partitions.count = 16
    
    # if not provided, will use storm's zookeeper settings
    # zookeeper.connectionstring=localhost:2181
    
    eventhubspout.checkpoint.interval = 10
    eventhub.receiver.credits = 10
    

    Az eventhub.receiver.credits értéke határozza meg, hogy hány esemény van kötegelve, mielőtt felengedné őket a Storm-folyamatba. Az egyszerűség kedvéért ez a példa ezt az értéket 10-re állítja. Éles környezetben általában magasabb értékekre kell állítani; például 1024. 1 . Hozzon létre egy LoggerBolt nevű új osztályt a következő kóddal:

    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    public class LoggerBolt extends BaseRichBolt {
        private OutputCollector collector;
        private static final Logger logger = LoggerFactory
                  .getLogger(LoggerBolt.class);
    
        @Override
        public void execute(Tuple tuple) {
            String value = tuple.getString(0);
            logger.info("Tuple value: " + value);
    
            collector.ack(tuple);
        }
    
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            this.count = 0;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // no output fields
        }
    
    }
    

    Ez a Storm-bolt naplózza a fogadott események tartalmát. Ez egyszerűen kiterjeszthető a tárolási szolgáltatásban tárolt rekordok tárolására. A HDInsight Storm és az Event Hub példájában ugyanezzel a módszerrel tárol adatokat az Azure Storage-ban és a Power BI-ban.

  8. Hozzon létre egy LogTopology nevű osztályt a következő kóddal:

    import java.io.FileReader;
    import java.util.Properties;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;
    import com.microsoft.eventhubs.samples.EventCount;
    import com.microsoft.eventhubs.spout.EventHubSpout;
    import com.microsoft.eventhubs.spout.EventHubSpoutConfig;
    
    public class LogTopology {
        protected EventHubSpoutConfig spoutConfig;
        protected int numWorkers;
    
        protected void readEHConfig(String[] args) throws Exception {
            Properties properties = new Properties();
            if (args.length > 1) {
                properties.load(new FileReader(args[1]));
            } else {
                properties.load(EventCount.class.getClassLoader()
                        .getResourceAsStream("Config.properties"));
            }
    
            String username = properties.getProperty("eventhubspout.username");
            String password = properties.getProperty("eventhubspout.password");
            String namespaceName = properties
                    .getProperty("eventhubspout.namespace");
            String entityPath = properties.getProperty("eventhubspout.entitypath");
            String zkEndpointAddress = properties
                    .getProperty("zookeeper.connectionstring"); // opt
            int partitionCount = Integer.parseInt(properties
                    .getProperty("eventhubspout.partitions.count"));
            int checkpointIntervalInSeconds = Integer.parseInt(properties
                    .getProperty("eventhubspout.checkpoint.interval"));
            int receiverCredits = Integer.parseInt(properties
                    .getProperty("eventhub.receiver.credits")); // prefetch count
                                                                // (opt)
            System.out.println("Eventhub spout config: ");
            System.out.println("  partition count: " + partitionCount);
            System.out.println("  checkpoint interval: "
                    + checkpointIntervalInSeconds);
            System.out.println("  receiver credits: " + receiverCredits);
    
            spoutConfig = new EventHubSpoutConfig(username, password,
                    namespaceName, entityPath, partitionCount, zkEndpointAddress,
                    checkpointIntervalInSeconds, receiverCredits);
    
            // set the number of workers to be the same as partition number.
            // the idea is to have a spout and a logger bolt co-exist in one
            // worker to avoid shuffling messages across workers in storm cluster.
            numWorkers = spoutConfig.getPartitionCount();
    
            if (args.length > 0) {
                // set topology name so that sample Trident topology can use it as
                // stream name.
                spoutConfig.setTopologyName(args[0]);
            }
        }
    
        protected StormTopology buildTopology() {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
    
            EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
            topologyBuilder.setSpout("EventHubsSpout", eventHubSpout,
                    spoutConfig.getPartitionCount()).setNumTasks(
                    spoutConfig.getPartitionCount());
            topologyBuilder
                    .setBolt("LoggerBolt", new LoggerBolt(),
                            spoutConfig.getPartitionCount())
                    .localOrShuffleGrouping("EventHubsSpout")
                    .setNumTasks(spoutConfig.getPartitionCount());
            return topologyBuilder.createTopology();
        }
    
        protected void runScenario(String[] args) throws Exception {
            boolean runLocal = true;
            readEHConfig(args);
            StormTopology topology = buildTopology();
            Config config = new Config();
            config.setDebug(false);
    
            if (runLocal) {
                config.setMaxTaskParallelism(2);
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("test", config, topology);
                Thread.sleep(5000000);
                localCluster.shutdown();
            } else {
                config.setNumWorkers(numWorkers);
                StormSubmitter.submitTopology(args[0], config, topology);
            }
        }
    
        public static void main(String[] args) throws Exception {
            LogTopology topology = new LogTopology();
            topology.runScenario(args);
        }
    }
    

    Ez az osztály létrehoz egy új Event Hubs-spoutot a konfigurációs fájl tulajdonságainak használatával a példányosításhoz. Fontos megjegyezni, hogy ez a példa annyi spouts feladatot hoz létre, mint az eseményközpont partícióinak száma, hogy az adott eseményközpont által megengedett maximális párhuzamosságot használja.

Következő lépések

Az alábbi webhelyeken további információt talál az Event Hubsról: