Použití služby Java Message Service 1.1 se standardem Azure Service Bus a AMQP 1.0
Upozorňující
Tento článek se zabývá omezenou podporou rozhraní API JMS (Java Message Service) 1.1 a existuje pouze pro úroveň Standard služby Azure Service Bus.
Úplná podpora rozhraní API služby Java Message Service 2.0 je dostupná pouze na úrovni Premium služby Azure Service Bus. Doporučujeme použít tuto úroveň.
Tento článek vysvětluje, jak používat funkce zasílání zpráv service Bus z aplikací v Javě pomocí oblíbeného standardu rozhraní JMS API. Mezi tyto funkce zasílání zpráv patří fronty a publikování nebo přihlášení k odběru témat. Doprovodný článek vysvětluje, jak to udělat stejně pomocí rozhraní .NET API služby Azure Service Bus. Tyto dva články můžete společně použít k získání informací o zasílání zpráv mezi platformami pomocí protokolu AMQP (Advanced Message Queuing Protocol) 1.0.
AMQP 1.0 je efektivní, spolehlivý protokol zasílání zpráv na úrovni drátu, který můžete použít k vytváření robustních aplikací pro zasílání zpráv napříč platformami.
Podpora AMQP 1.0 ve službě Service Bus znamená, že pomocí efektivního binárního protokolu můžete používat funkce řazení do front a publikování nebo odběru zprostředkovaných zpráv z celé řady platforem. Můžete také vytvářet aplikace složené z komponent vytvořených pomocí kombinace jazyků, architektur a operačních systémů.
Začínáme se službou Service Bus
Tento článek předpokládá, že již máte obor názvů služby Service Bus, který obsahuje frontu s názvem basicqueue
. Pokud ne, můžete vytvořit obor názvů a frontu pomocí webu Azure Portal. Další informace o tom, jak vytvářet obory názvů a fronty služby Service Bus, najdete v tématu Začínáme s frontami služby Service Bus.
Poznámka:
Dělené fronty a témata také podporují AMQP. Další informace najdete v tématu Dělení na entity zasílání zpráv a podpora AMQP 1.0 pro dělené fronty a témata služby Service Bus.
Stažení klientské knihovny AMQP 1.0 JMS
Informace o tom, kde stáhnout nejnovější verzi klientské knihovny Apache Qpid JMS AMQP 1.0, najdete na webu pro stažení Apache Qpid.
Při sestavování a spouštění aplikací JMS pomocí service Bus je nutné přidat následující soubory JAR z distribučního archivu Apache Qpid JMS AMQP 1.0 do proměnné prostředí Java CLASSPATH:
- geronimo-jms_1.1_spec-1.0.jar
- qpid-jms-client-[version].jar
Poznámka:
Můžou se změnit názvy a verze JMS JAR. Další informace najdete v tématu Qpid JMS AMQP 1.0.
Kódování aplikací v Javě
Pojmenování Java a rozhraní adresáře
JMS používá rozhraní JNDI (Java Naming and Directory Interface) k vytvoření oddělení mezi logickými názvy a fyzickými názvy. Dva typy objektů JMS se přeloží pomocí JNDI: Připojení ionFactory a Destination. JNDI používá model poskytovatele, do kterého můžete připojit různé adresářové služby ke zpracování povinností překladu ip adres. Knihovna Apache Qpid JMS AMQP 1.0 se dodává s jednoduchým zprostředkovatelem JNDI založeným na souborech vlastností, který je nakonfigurovaný pomocí souboru vlastností následujícího formátu:
# servicebus.properties - sample JNDI configuration
# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net
# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1
Nastavení kontextu JNDI a konfigurace objektu Připojení ionFactory
Odkazovaný připojovací řetězec je ten, který je k dispozici v zásadách sdíleného přístupu na webu Azure Portal v části Primární řetězec Připojení ionu.
// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus
// connection string.
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");
Konfigurace cílových front producenta a příjemce
Položka použitá k definování cíle v souboru vlastností Qpid je zprostředkovatel JNDI v následujícím formátu.
Vytvoření cílové fronty pro producenta:
String queueName = "queueName";
Destination queue = (Destination) queueName;
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create producer
MessageProducer producer = session.createProducer(queue);
Vytvoření cílové fronty pro příjemce:
String queueName = "queueName";
Destination queue = (Destination) queueName;
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create consumer
MessageConsumer consumer = session.createConsumer(queue);
Napsání aplikace JMS
Pokud používáte JMS se službou Service Bus, nevyžadují se žádná speciální rozhraní API ani možnosti. Existuje několik omezení, která budou probíraná později. Stejně jako u jakékoli aplikace JMS je potřeba nejprve nakonfigurovat prostředí JNDI, aby bylo možné přeložit Připojení ionFactory objekt a cíle.
Konfigurace objektu JNDI InitialContext
Prostředí JNDI je nakonfigurováno předáním hash tabulky konfiguračních informací do konstruktoru javax.naming.InitialContext třídy. Dva požadované elementy v tabulce hash jsou název třídy Objekt pro vytváření počátečního kontextu a adresy URL zprostředkovatele. Následující kód ukazuje, jak nakonfigurovat prostředí JNDI tak, aby používalo zprostředkovatele JNDI vlastností Qpid se souborem vlastností s názvem servicebus.properties.
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
Jednoduchá aplikace JMS, která používá frontu služby Service Bus
Následující ukázkový program odešle textové zprávy JMS do fronty Service Bus s logickým názvem FRONTY JNDI a přijme zprávy zpět.
Přístup ke všem zdrojovým kódům a konfiguračním informacím najdete v rychlém startu fronty JMS s ukázkami služby Azure Service Bus.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
* This sample demonstrates how to send messages from a JMS queue producer into
* an Azure Service Bus queue and receive them with a JMS message consumer.
* JMS queue.
*/
public class JmsQueueQuickstart {
// Number of messages to send
private static int totalSend = 10;
//Tracking counter for how many messages have been received; used as termination condition
private static AtomicInteger totalReceived = new AtomicInteger(0);
// log4j logger
private static Logger logger = Logger.getRootLogger();
public void run(String connectionString) throws Exception {
// The connection string properties is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus
// connection string.
ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");
// We create a scope here so we can use the same set of local variables cleanly
// again to show the receive side separately with minimal clutter.
{
// Create connection
Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
// Create session, no transaction, client ack
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create producer
MessageProducer producer = session.createProducer(queue);
// Send messages
for (int i = 0; i < totalSend; i++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(String.valueOf(i).getBytes());
producer.send(message);
System.out.printf("Sent message %d.\n", i + 1);
}
producer.close();
session.close();
connection.stop();
connection.close();
}
{
// Create connection
Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
connection.start();
// Create session, no transaction, client ack
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create consumer
MessageConsumer consumer = session.createConsumer(queue);
// Create a listener callback to receive the messages
consumer.setMessageListener(message -> {
try {
// Received message is passed to callback
System.out.printf("Received message %d with sq#: %s\n",
totalReceived.incrementAndGet(), // increments the tracking counter
message.getJMSMessageID());
message.acknowledge();
} catch (Exception e) {
logger.error(e);
}
});
// Wait on the main thread until all sent messages have been received
while (totalReceived.get() < totalSend) {
Thread.sleep(1000);
}
consumer.close();
session.close();
connection.stop();
connection.close();
}
System.out.printf("Received all messages, exiting the sample.\n");
System.out.printf("Closing queue client.\n");
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
JmsQueueQuickstart app = new JmsQueueQuickstart();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// Parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// Get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
}
Spuštění aplikace
Předejte řetězec Připojení ion ze zásad sdíleného přístupu ke spuštění aplikace. Následující výstup je ve formuláři, na kterém je aplikace spuštěná:
> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"
Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.
AMQP disposition and Service Bus operation mapping
Tady je postup, jak se dispozice AMQP překládá na operaci Service Bus:
ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()
Témata JMS vs. témata služby Service Bus
Použití témat a odběrů služby Service Bus prostřednictvím rozhraní JMS API poskytuje základní možnosti odesílání a příjmu. Je to vhodná volba, když portujete aplikace z jiných zprostředkovatelů zpráv pomocí rozhraní API kompatibilních s JMS, i když se témata služby Service Bus liší od témat JMS a vyžadují několik úprav.
Témata služby Service Bus směrují zprávy do pojmenovaných, sdílených a trvalých předplatných spravovaných prostřednictvím rozhraní správy prostředků Azure, nástrojů příkazového řádku Azure nebo webu Azure Portal. Každé předplatné umožňuje až 2 000 pravidel výběru, z nichž každé může mít podmínku filtru a pro filtry SQL také akci transformace metadat. Každá podmínka filtru odpovídá vstupní zprávě, která se má zkopírovat do odběru.
Příjem zpráv z odběrů je stejný jako příjem zpráv z front. Každé předplatné má přidruženou frontu nedoručených zpráv a možnost automatického přeposílání zpráv do jiné fronty nebo témat.
Témata JMS umožňují klientům dynamicky vytvářet nedurovatelné a odolné odběratele, kteří volitelně umožňují filtrování zpráv pomocí selektorů zpráv. Service Bus tyto nesdílené entity nepodporuje. Syntaxe pravidla filtru SQL pro Service Bus je podobná syntaxi selektoru zpráv podporovanou JMS.
Stránka vydavatele tématu JMS je kompatibilní se službou Service Bus, jak je znázorněno v této ukázce, ale dynamické předplatitele nejsou. Service Bus nepodporuje následující rozhraní JMS API související s topologií.
Nepodporované funkce a omezení
Při použití JMS přes AMQP 1.0 se službou Service Bus existují následující omezení:
- Pro každou relaci je povolen pouze jeden objekt MessageProducer nebo MessageConsumer . Pokud potřebujete v aplikaci vytvořit více objektů MessageProducer nebo MessageConsumer , vytvořte pro každý z nich vyhrazenou relaci.
- Nestálá předplatná témat se v současné době nepodporují.
- Objekty MessageSelector se v současné době nepodporují.
- Distribuované transakce se nepodporují, ale podporují se transakční relace.
Service Bus rozdělí řídicí rovinu z roviny dat, takže nepodporuje několik funkcí dynamické topologie JMS.
Nepodporovaná metoda | Replace with |
---|---|
createDurableSubscriber | Vytvořte odběr tématu, který portuje selektor zpráv. |
createDurableConsumer | Vytvořte odběr tématu, který portuje selektor zpráv. |
createSharedConsumer | Témata služby Service Bus jsou vždy sdíletelná. Viz část Témata JMS vs. témata služby Service Bus. |
createSharedDurableConsumer | Témata služby Service Bus jsou vždy sdíletelná. Viz část Témata JMS vs. témata služby Service Bus. |
createTemporaryTopic | Vytvořte téma prostřednictvím rozhraní API pro správu, nástrojů nebo portálu s autoDeleteOnIdle nastaveným na dobu vypršení platnosti. |
createTopic | Vytvořte téma prostřednictvím rozhraní API pro správu, nástrojů nebo portálu. |
Odhlásit | Odstraňte rozhraní API, nástroje nebo portál pro správu témat. |
createBrowser | Nepodporuje se. Použijte funkci Náhled() rozhraní API služby Service Bus. |
createQueue | Vytvořte frontu prostřednictvím rozhraní API pro správu, nástrojů nebo portálu. |
createTemporaryQueue | Vytvořte frontu prostřednictvím rozhraní API pro správu, nástrojů nebo portálu s automatickým nastavením na dobu vypršení platnosti. |
receiveNoWait | Použijte metodu receive(), kterou poskytuje sada SDK služby Service Bus, a zadejte velmi nízký nebo nulový časový limit. |
Shrnutí
Tento článek vám ukázal, jak používat funkce zprostředkovaného zasílání zpráv služby Service Bus, jako jsou fronty a publikování nebo přihlášení k odběru témat, z Javy pomocí oblíbeného rozhraní JMS API a AMQP 1.0.
Service Bus AMQP 1.0 můžete použít také z jiných jazyků, jako jsou .NET, C, Python a PHP. Komponenty vytvořené pomocí těchto různých jazyků můžou spolehlivě a plně věrně vyměňovat zprávy pomocí podpory AMQP 1.0 ve službě Service Bus.