Java Message Service 1.1'i Azure Service Bus standart ve AMQP 1.0 ile kullanma
Uyarı
Bu makale Java İleti Hizmeti (JMS) 1.1 API'sine yönelik sınırlı desteğe yöneliktir ve yalnızca Azure Service Bus standart katmanı için mevcuttur.
Java İleti Hizmeti 2.0 API'sine yönelik tam destek yalnızca Azure Service Bus premium katmanında kullanılabilir. Bu katmanı kullanmanızı öneririz.
Bu makalede, popüler JMS API standardını kullanarak Java uygulamalarından Service Bus mesajlaşma özelliklerinin nasıl kullanılacağı açıklanmaktadır. Bu mesajlaşma özellikleri kuyrukları ve konu başlıklarını yayımlamayı veya abone olmasını içerir. Yardımcı makalede , Azure Service Bus .NET API'sini kullanarak aynı işlemi nasıl yapabileceğiniz açıklanır. Gelişmiş Message Queuing Protokolü (AMQP) 1.0'ı kullanarak platformlar arası mesajlaşma hakkında bilgi edinmek için bu iki makaleyi birlikte kullanabilirsiniz.
AMQP 1.0, sağlam, platformlar arası mesajlaşma uygulamaları oluşturmak için kullanabileceğiniz verimli, güvenilir, kablo düzeyinde bir mesajlaşma protokolüdür.
Service Bus'ta AMQP 1.0 desteği, verimli bir ikili protokol kullanarak çeşitli platformlardan sıraya alma ve aracılı mesajlaşma özelliklerini yayımlayabileceğiniz veya abone olabileceğiniz anlamına gelir. Ayrıca dillerin, çerçevelerin ve işletim sistemlerinin bir karışımı kullanılarak oluşturulan bileşenlerden oluşan uygulamalar da oluşturabilirsiniz.
Service Bus’ı kullanmaya başlama
Bu makalede, adlı basicqueue
bir kuyruk içeren bir Service Bus ad alanınız olduğu varsayılır. Aksi takdirde, Azure portalını kullanarak ad alanını ve kuyruğu oluşturabilirsiniz. Service Bus ad alanlarını ve kuyruklarını oluşturma hakkında daha fazla bilgi için bkz . Service Bus kuyruklarını kullanmaya başlama.
Dekont
Bölümlenmiş kuyruklar ve konular da AMQP'yi destekler. Daha fazla bilgi için bkz . Bölümlenmiş mesajlaşma varlıkları ve Service Bus bölümlenmiş kuyrukları ve konuları için AMQP 1.0 desteği.
AMQP 1.0 JMS istemci kitaplığını indirme
Apache Qpid JMS AMQP 1.0 istemci kitaplığının en son sürümünün nereden indirileceği hakkında bilgi için Apache Qpid indirme sitesine bakın.
Service Bus ile JMS uygulamaları derleyip çalıştırırken Apache Qpid JMS AMQP 1.0 dağıtım arşivinden Java CLASSPATH ortam değişkenine aşağıdaki JAR dosyalarını eklemeniz gerekir:
- geronimo-jms_1.1_spec-1.0.jar
- qpid-jms-client-[version].jar
Dekont
JMS JAR adları ve sürümleri değişmiş olabilir. Daha fazla bilgi için bkz . Qpid JMS AMQP 1.0.
Java uygulamalarını kodlayın
Java Adlandırma ve Dizin Arabirimi
JMS, mantıksal adlarla fiziksel adlar arasında bir ayrım oluşturmak için Java Adlandırma ve Dizin Arabirimi'ni (JNDI) kullanır. JNDI kullanılarak iki tür JMS nesnesi çözümlenir: Bağlan ionFactory ve Destination. JNDI, ad çözümleme görevlerini işlemek için farklı dizin hizmetlerini takabileceğiniz bir sağlayıcı modeli kullanır. Apache Qpid JMS AMQP 1.0 kitaplığı, aşağıdaki biçimde bir özellik dosyası kullanılarak yapılandırılan basit bir özellik dosyası tabanlı JNDI sağlayıcısıyla birlikte gelir:
# servicebus.properties - sample JNDI configuration
# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net
# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1
JNDI bağlamını ayarlama ve Bağlan ionFactory nesnesini yapılandırma
Başvurulan bağlantı dizesi, Azure portalında Birincil Bağlan ion Dizesi altındaki Paylaşılan Erişim İlkeleri'nde bulunan ilkedir.
// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus
// connection string.
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");
Üretici ve tüketici hedef kuyruklarını yapılandırma
Qpid özellikler dosyasında JNDI sağlayıcısında bir hedef tanımlamak için kullanılan giriş aşağıdaki biçimdedir.
Üretici için hedef kuyruk oluşturmak için:
String queueName = "queueName";
Destination queue = (Destination) queueName;
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create producer
MessageProducer producer = session.createProducer(queue);
Tüketici için hedef kuyruk oluşturmak için:
String queueName = "queueName";
Destination queue = (Destination) queueName;
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create consumer
MessageConsumer consumer = session.createConsumer(queue);
JMS uygulamasını yazma
Service Bus ile JMS kullanırken özel API'ler veya seçenekler gerekmez. Daha sonra ele alınabilecek birkaç kısıtlama vardır. Herhangi bir JMS uygulamasında olduğu gibi, gereken ilk şey bir Bağlan ionFactory nesnesini ve hedeflerini çözümleyebilmek için JNDI ortamının yapılandırılmasıdır.
JNDI InitialContext nesnesini yapılandırma
JNDI ortamı, yapılandırma bilgilerinin karma tablosunu javax.naming.InitialContext sınıfının oluşturucusuna geçirerek yapılandırılır. Karma tablodaki gerekli iki öğe, İlk Bağlam Fabrikası'nın ve sağlayıcı URL'sinin sınıf adıdır. Aşağıdaki kod, JNDI ortamının Qpid özellikleri dosya tabanlı JNDI sağlayıcısını servicebus.properties adlı bir özellik dosyasıyla kullanacak şekilde nasıl yapılandırıldığını gösterir.
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
Service Bus kuyruğu kullanan basit bir JMS uygulaması
Aşağıdaki örnek program, JMS metin iletilerini QUEUE'un JNDI mantıksal adıyla bir Service Bus kuyruğuna gönderir ve iletileri geri alır.
Azure Service Bus örnekleri JMS kuyruğu hızlı başlangıcından tüm kaynak koduna ve yapılandırma bilgilerine erişebilirsiniz.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
* This sample demonstrates how to send messages from a JMS queue producer into
* an Azure Service Bus queue and receive them with a JMS message consumer.
* JMS queue.
*/
public class JmsQueueQuickstart {
// Number of messages to send
private static int totalSend = 10;
//Tracking counter for how many messages have been received; used as termination condition
private static AtomicInteger totalReceived = new AtomicInteger(0);
// log4j logger
private static Logger logger = Logger.getRootLogger();
public void run(String connectionString) throws Exception {
// The connection string properties is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus
// connection string.
ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");
// We create a scope here so we can use the same set of local variables cleanly
// again to show the receive side separately with minimal clutter.
{
// Create connection
Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
// Create session, no transaction, client ack
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create producer
MessageProducer producer = session.createProducer(queue);
// Send messages
for (int i = 0; i < totalSend; i++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(String.valueOf(i).getBytes());
producer.send(message);
System.out.printf("Sent message %d.\n", i + 1);
}
producer.close();
session.close();
connection.stop();
connection.close();
}
{
// Create connection
Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
connection.start();
// Create session, no transaction, client ack
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create consumer
MessageConsumer consumer = session.createConsumer(queue);
// Create a listener callback to receive the messages
consumer.setMessageListener(message -> {
try {
// Received message is passed to callback
System.out.printf("Received message %d with sq#: %s\n",
totalReceived.incrementAndGet(), // increments the tracking counter
message.getJMSMessageID());
message.acknowledge();
} catch (Exception e) {
logger.error(e);
}
});
// Wait on the main thread until all sent messages have been received
while (totalReceived.get() < totalSend) {
Thread.sleep(1000);
}
consumer.close();
session.close();
connection.stop();
connection.close();
}
System.out.printf("Received all messages, exiting the sample.\n");
System.out.printf("Closing queue client.\n");
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
JmsQueueQuickstart app = new JmsQueueQuickstart();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// Parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// Get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
}
Uygulamayı çalıştırma
Uygulamayı çalıştırmak için Paylaşılan Erişim İlkeleri'nden Bağlan Ion Dizesini geçirin. Aşağıdaki çıkış, uygulamayı çalıştıran formdadır:
> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"
Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.
AMQP dağıtımı ve Service Bus işlem eşlemesi
AMQP dağıtımının Service Bus işlemine nasıl çevrildiği şöyledir:
ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()
JMS konuları ile Service Bus konuları karşılaştırması
JMS API aracılığıyla Service Bus konularının ve aboneliklerinin kullanılması temel gönderme ve alma özellikleri sağlar. Service Bus konuları JMS konularından farklı olsa ve birkaç ayarlama gerektirse de, JMS uyumlu API'ler ile diğer ileti aracılarından uygulama taşımanız kullanışlı bir seçimdir.
Service Bus konuları iletileri Azure Kaynak Yönetimi arabirimi, Azure komut satırı araçları veya Azure portalı aracılığıyla yönetilen adlandırılmış, paylaşılan ve dayanıklı aboneliklere yönlendirir. Her abonelik 2.000'e kadar seçim kuralına izin verir. Bunların her biri bir filtre koşuluna ve SQL filtreleri için de meta veri dönüştürme eylemine sahip olabilir. Her filtre koşulu eşleşmesi, aboneliğe kopyalanacak giriş iletisini seçer.
Aboneliklerden ileti almak, kuyruklardan ileti almakla aynıdır. Her aboneliğin ilişkili bir teslim edilemeyen kuyruk ve iletileri otomatik olarak başka bir kuyruğa veya konulara iletme özelliği vardır.
JMS konuları, istemcilerin isteğe bağlı olarak ileti seçicilerle iletileri filtrelemeye izin veren kalıcı olmayan ve dayanıklı aboneler oluşturmasına olanak tanır. Bu paylaşılmayan varlıklar Service Bus tarafından desteklenmez. Service Bus için SQL filtre kuralı söz dizimi, JMS tarafından desteklenen ileti seçici söz dizimine benzer.
JMS konusu yayımcı tarafı, bu örnekte gösterildiği gibi Service Bus ile uyumludur, ancak dinamik aboneler uyumlu değildir. Aşağıdaki topolojiyle ilgili JMS API'leri Service Bus ile desteklenmez.
Desteklenmeyen özellikler ve kısıtlamalar
Service Bus ile AMQP 1.0 üzerinden JMS kullandığınızda aşağıdaki kısıtlamalar vardır:
- Oturum başına yalnızca bir MessageProducer veya MessageConsumer nesnesine izin verilir. Bir uygulamada birden çok MessageProducer veya MessageConsumer nesnesi oluşturmanız gerekiyorsa, her biri için ayrılmış bir oturum oluşturun.
- Geçici konu abonelikleri şu anda desteklenmemektedir.
- MessageSelector nesneleri şu anda desteklenmemektedir.
- Dağıtılmış işlemler desteklenmez, ancak işlem yapılan oturumlar desteklenir.
Service Bus, denetim düzlemini veri düzleminden ayırır, bu nedenle JMS'nin dinamik topoloji işlevlerinin birkaçını desteklemez.
Desteklenmeyen yöntem | Replace with |
---|---|
createDurableSubscriber | İleti seçiciye bağlantı noktası oluşturan bir konu başlığı aboneliği oluşturun. |
createDurableConsumer | İleti seçiciye bağlantı noktası oluşturan bir konu başlığı aboneliği oluşturun. |
createSharedConsumer | Service Bus konuları her zaman paylaşılabilir. "JMS konuları ve Service Bus konuları" bölümüne bakın. |
createSharedDurableConsumer | Service Bus konuları her zaman paylaşılabilir. "JMS konuları ve Service Bus konuları" bölümüne bakın. |
createTemporaryTopic | AutoDeleteOnIdle'ın süre sonu olarak ayarlandığı yönetim API'si, araçlar veya portal aracılığıyla bir konu oluşturun. |
createTopic | Yönetim API'sini, araçları veya portalı kullanarak bir konu oluşturun. |
Iptal | Konu yönetimi API'sini, araçlarını veya portalını silin. |
createBrowser | Desteklenmez. Service Bus API'sinin Peek() işlevini kullanın. |
createQueue | Yönetim API'sini, araçları veya portalı kullanarak bir kuyruk oluşturun. |
createTemporaryQueue | AutoDeleteOnIdle'ın süre sonu olarak ayarlandığı yönetim API'sini, araçlarını veya portalını kullanarak bir kuyruk oluşturun. |
receiveNoWait | Service Bus SDK'sı tarafından sağlanan receive() yöntemini kullanın ve çok düşük veya sıfır bir zaman aşımı belirtin. |
Özet
Bu makalede, popüler JMS API'sini ve AMQP 1.0'ı kullanarak Java'dan kuyruklar ve yayımlama veya abone olma konuları gibi Service Bus aracılı mesajlaşma özelliklerinin nasıl kullanılacağı gösterildi.
.NET, C, Python ve PHP gibi diğer dillerden Service Bus AMQP 1.0'ı da kullanabilirsiniz. Bu farklı diller kullanılarak oluşturulan bileşenler, Service Bus'taki AMQP 1.0 desteğini kullanarak güvenilir ve tam uygunlukta ileti alışverişi yapabilir.