Quickstart: استقبل الأحداث من مراكز الأحداث باستخدام Apache Storm

Apache Storm هو نظام حسابي موزع في الوقت الفعلي يبسط المعالجة الموثوقة لتدفقات البيانات غير المحدودة. يوضح هذا القسم كيفية استخدام صنبور العاصفة في مراكز الأحداث لاستقبال الأحداث من مراكز الأحداث. باستخدام Apache Storm، يمكنك تقسيم الأحداث عبر عمليات متعددة مستضافة في عقد مختلفة. يعمل تكامل مراكز الأحداث مع Storm على تبسيط استهلاك الحدث من خلال تحديد تقدمه بشفافية باستخدام تثبيت Storm's Zookeeper، وإدارة نقاط التحقق المستمرة والاستلام الموازي من مراكز الأحداث.

لمزيد من المعلومات حول تلقي الأنماط لمراكز الأحداث، راجع "مراكز الأحداث overview".

المتطلبات الأساسية

قبل أن تبدأ بالبداية السريعة، "create an مراكز الأحداث namespace and an event hub" . استخدم "مدخل Microsoft Azure" لإنشاء مساحة اسم من نوع مراكز الأحداث، والحصول على بيانات اعتماد الإدارة التي يحتاجها تطبيقك للتواصل مع مركز الحدث. لإنشاء مساحة اسم ومركز أحداث، اتبع الإجراء الوارد في هذه المقالة.

إنشاء مشروع وإضافة رمز

  1. استخدم الأمر التالي لتثبيت الحزمة في متجر Maven المحلي. يمكّنك هذا من إضافته كمرجع في مشروع Storm في خطوة لاحقة.

    mvn install:install-file -Dfile=target\eventhubs-storm-spout-0.9-jar-with-dependencies.jar -DgroupId=com.microsoft.eventhubs -DartifactId=eventhubs-storm-spout -Dversion=0.9 -Dpackaging=jar
    
  2. في Eclipse، قم بإنشاء مشروع Maven جديد (انقر فوق "File" ، ثم "New" ، ثم "Project" ).

    ملف -> جديد -> مشروع

  3. حدد "Use default Workspace location" ، ثم انقر على "Next"

  4. حدد النموذج الأصلي "maven-archetype-quickstart" ، ثم انقر على "Next"

  5. أدخل GroupId وArtifactId، ثم انقر على "Finish"

  6. في pom.xml، أضف التبعيات التالية في العقدة <dependency>.

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2-incubating</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.microsoft.eventhubs</groupId>
        <artifactId>eventhubs-storm-spout</artifactId>
        <version>0.9</version>
    </dependency>
    <dependency>
        <groupId>com.netflix.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>1.3.3</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
        <scope>provided</scope>
    </dependency>
    
  7. في المجلد src، أنشئ ملفاً يسمى Config.properties وانسخ المحتوى التالي، واستبدل قيمتي receive rule key وevent hub name:

    eventhubspout.username = ReceiveRule
    eventhubspout.password = {receive rule key}
    eventhubspout.namespace = ioteventhub-ns
    eventhubspout.entitypath = {event hub name}
    eventhubspout.partitions.count = 16
    
    # if not provided, will use storm's zookeeper settings
    # zookeeper.connectionstring=localhost:2181
    
    eventhubspout.checkpoint.interval = 10
    eventhub.receiver.credits = 10
    

    تحدد قيمة eventhub.receiver.credits عدد الأحداث المجمعة قبل إطلاقها في مسار العاصفة. من أجل البساطة، يحدد هذا المثال هذه القيمة على 10. في الإنتاج، يجب عادةً ضبطه على قيم أعلى ؛ على سبيل المثال، 1024. 1 . قم بإنشاء فئة جديدة تسمى LoggerBolt بالرمز التالي:

    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    public class LoggerBolt extends BaseRichBolt {
        private OutputCollector collector;
        private static final Logger logger = LoggerFactory
                  .getLogger(LoggerBolt.class);
    
        @Override
        public void execute(Tuple tuple) {
            String value = tuple.getString(0);
            logger.info("Tuple value: " + value);
    
            collector.ack(tuple);
        }
    
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            this.count = 0;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // no output fields
        }
    
    }
    

    يسجل مسمار العاصفة هذا محتوى الأحداث المستلمة. يمكن توسيع هذا بسهولة لتخزين المجموعات في خدمة التخزين. يستخدم مثال HDInsight Storm with Event Hubنفس الأسلوب لتخزين البيانات في Azure Storage وPower BI.

  8. أنشئ فئة تسمى LogTopology بالرمز التالي:

    import java.io.FileReader;
    import java.util.Properties;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;
    import com.microsoft.eventhubs.samples.EventCount;
    import com.microsoft.eventhubs.spout.EventHubSpout;
    import com.microsoft.eventhubs.spout.EventHubSpoutConfig;
    
    public class LogTopology {
        protected EventHubSpoutConfig spoutConfig;
        protected int numWorkers;
    
        protected void readEHConfig(String[] args) throws Exception {
            Properties properties = new Properties();
            if (args.length > 1) {
                properties.load(new FileReader(args[1]));
            } else {
                properties.load(EventCount.class.getClassLoader()
                        .getResourceAsStream("Config.properties"));
            }
    
            String username = properties.getProperty("eventhubspout.username");
            String password = properties.getProperty("eventhubspout.password");
            String namespaceName = properties
                    .getProperty("eventhubspout.namespace");
            String entityPath = properties.getProperty("eventhubspout.entitypath");
            String zkEndpointAddress = properties
                    .getProperty("zookeeper.connectionstring"); // opt
            int partitionCount = Integer.parseInt(properties
                    .getProperty("eventhubspout.partitions.count"));
            int checkpointIntervalInSeconds = Integer.parseInt(properties
                    .getProperty("eventhubspout.checkpoint.interval"));
            int receiverCredits = Integer.parseInt(properties
                    .getProperty("eventhub.receiver.credits")); // prefetch count
                                                                // (opt)
            System.out.println("Eventhub spout config: ");
            System.out.println("  partition count: " + partitionCount);
            System.out.println("  checkpoint interval: "
                    + checkpointIntervalInSeconds);
            System.out.println("  receiver credits: " + receiverCredits);
    
            spoutConfig = new EventHubSpoutConfig(username, password,
                    namespaceName, entityPath, partitionCount, zkEndpointAddress,
                    checkpointIntervalInSeconds, receiverCredits);
    
            // set the number of workers to be the same as partition number.
            // the idea is to have a spout and a logger bolt co-exist in one
            // worker to avoid shuffling messages across workers in storm cluster.
            numWorkers = spoutConfig.getPartitionCount();
    
            if (args.length > 0) {
                // set topology name so that sample Trident topology can use it as
                // stream name.
                spoutConfig.setTopologyName(args[0]);
            }
        }
    
        protected StormTopology buildTopology() {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
    
            EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
            topologyBuilder.setSpout("EventHubsSpout", eventHubSpout,
                    spoutConfig.getPartitionCount()).setNumTasks(
                    spoutConfig.getPartitionCount());
            topologyBuilder
                    .setBolt("LoggerBolt", new LoggerBolt(),
                            spoutConfig.getPartitionCount())
                    .localOrShuffleGrouping("EventHubsSpout")
                    .setNumTasks(spoutConfig.getPartitionCount());
            return topologyBuilder.createTopology();
        }
    
        protected void runScenario(String[] args) throws Exception {
            boolean runLocal = true;
            readEHConfig(args);
            StormTopology topology = buildTopology();
            Config config = new Config();
            config.setDebug(false);
    
            if (runLocal) {
                config.setMaxTaskParallelism(2);
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("test", config, topology);
                Thread.sleep(5000000);
                localCluster.shutdown();
            } else {
                config.setNumWorkers(numWorkers);
                StormSubmitter.submitTopology(args[0], config, topology);
            }
        }
    
        public static void main(String[] args) throws Exception {
            LogTopology topology = new LogTopology();
            topology.runScenario(args);
        }
    }
    

    تنشئ هذه الفئة صنبور مراكز الأحداث جديد، باستخدام الخصائص الموجودة في ملف التكوين لإنشاء مثيل لها. من المهم ملاحظة أن هذا المثال يُنشئ العديد من مهام الأنبوب مثل عدد الأقسام في مركز الحدث، من أجل استخدام الحد الأقصى من التوازي المسموح به بواسطة مركز الحدث هذا.

الخطوات التالية

يمكنك معرفة المزيد عن مراكز الأحداث من خلال زيارة الروابط التالية: