Använda Java Message Service 1.1 med Azure Service Bus Standard och AMQP 1.0
Varning
Den här artikeln riktar sig till begränsat stöd för Java Message Service (JMS) 1.1 API och finns endast för Azure Service Bus-standardnivån.
Fullständigt stöd för Java Message Service 2.0-API:et är endast tillgängligt på Azure Service Bus Premium-nivån. Vi rekommenderar att du använder den här nivån.
Den här artikeln beskriver hur du använder Service Bus-meddelandefunktioner från Java-program med hjälp av den populära JMS API-standarden. Dessa meddelandefunktioner omfattar köer och publicering eller prenumeration på ämnen. En tillhörande artikel beskriver hur du gör samma sak med hjälp av Azure Service Bus .NET API. Du kan använda dessa två artiklar tillsammans för att lära dig mer om plattformsoberoende meddelanden med hjälp av ADVANCED Message Queuing Protocol (AMQP) 1.0.
AMQP 1.0 är ett effektivt, tillförlitligt meddelandeprotokoll på trådnivå som du kan använda för att skapa robusta, plattformsoberoende meddelandeprogram.
Stöd för AMQP 1.0 i Service Bus innebär att du kan använda funktionerna för köer och publicera eller prenumerera på asynkrona meddelanden från en rad olika plattformar med hjälp av ett effektivt binärt protokoll. Du kan också skapa program som består av komponenter som skapats med hjälp av en blandning av språk, ramverk och operativsystem.
Kom igång med Service Bus
Den här artikeln förutsätter att du redan har ett Service Bus-namnområde som innehåller en kö med namnet basicqueue
. Om du inte gör det kan du skapa namnområdet och kön med hjälp av Azure-portalen. Mer information om hur du skapar Service Bus-namnområden och köer finns i Komma igång med Service Bus-köer.
Kommentar
Partitionerade köer och ämnen stöder även AMQP. Mer information finns i Partitionerade meddelandeentiteter och AMQP 1.0-stöd för Service Bus-partitionerade köer och ämnen.
Ladda ned AMQP 1.0 JMS-klientbiblioteket
Information om var du kan ladda ned den senaste versionen av Apache Qpid JMS AMQP 1.0-klientbiblioteket finns på nedladdningswebbplatsen för Apache Qpid.
Du måste lägga till följande JAR-filer från Apache Qpid JMS AMQP 1.0-distributionsarkivet i Java CLASSPATH-miljövariabeln när du skapar och kör JMS-program med Service Bus:
- geronimo-jms_1.1_spec-1.0.jar
- qpid-jms-client-[version].jar
Kommentar
JMS JAR-namn och versioner kan ha ändrats. Mer information finns i Qpid JMS AMQP 1.0.
Koda Java-program
Java-namngivning och kataloggränssnitt
JMS använder Java Naming and Directory Interface (JNDI) för att skapa en separation mellan logiska namn och fysiska namn. Två typer av JMS-objekt löses med hjälp av JNDI: Anslut ionFactory och Destination. JNDI använder en providermodell där du kan ansluta olika katalogtjänster för att hantera namnmatchningsuppgifter. Apache Qpid JMS AMQP 1.0-biblioteket levereras med en enkel egenskapsfilbaserad JNDI-provider som konfigureras med hjälp av en egenskapsfil med följande format:
# servicebus.properties - sample JNDI configuration
# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net
# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1
Konfigurera JNDI-kontext och konfigurera objektet Anslut ionFactory
Den anslutningssträng refereras till är den som är tillgänglig i principerna för delad åtkomst i Azure-portalen under Primär Anslut ionssträng.
// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus
// connection string.
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");
Konfigurera målköer för producent och konsument
Posten som används för att definiera ett mål i Qpid-egenskapsfilens JNDI-provider har följande format.
Så här skapar du en målkö för producenten:
String queueName = "queueName";
Destination queue = (Destination) queueName;
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create producer
MessageProducer producer = session.createProducer(queue);
Så här skapar du en målkö för konsumenten:
String queueName = "queueName";
Destination queue = (Destination) queueName;
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create consumer
MessageConsumer consumer = session.createConsumer(queue);
Skriva JMS-programmet
Inga särskilda API:er eller alternativ krävs när du använder JMS med Service Bus. Det finns några begränsningar som kommer att omfattas senare. Precis som med alla JMS-program är det första som krävs konfigurationen av JNDI-miljön för att kunna lösa ett Anslut ionFactory-objekt och mål.
Konfigurera JNDI InitialContext-objektet
JNDI-miljön konfigureras genom att skicka en hash-tabell med konfigurationsinformation till konstruktorn för klassen javax.naming.InitialContext. De två obligatoriska elementen i hash-tabellen är klassnamnet för den inledande kontextfabriken och providerns URL. Följande kod visar hur du konfigurerar JNDI-miljön så att den använder den filbaserade JNDI-providern för Qpid-egenskaper med en egenskapsfil med namnet servicebus.properties.
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
Ett enkelt JMS-program som använder en Service Bus-kö
Följande exempelprogram skickar JMS-textmeddelanden till en Service Bus-kö med det logiska JNDI-namnet QUEUE och tar emot meddelandena tillbaka.
Du kan komma åt all källkod och konfigurationsinformation från snabbstarten för Azure Service Bus-exempel-JMS-kön.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
* This sample demonstrates how to send messages from a JMS queue producer into
* an Azure Service Bus queue and receive them with a JMS message consumer.
* JMS queue.
*/
public class JmsQueueQuickstart {
// Number of messages to send
private static int totalSend = 10;
//Tracking counter for how many messages have been received; used as termination condition
private static AtomicInteger totalReceived = new AtomicInteger(0);
// log4j logger
private static Logger logger = Logger.getRootLogger();
public void run(String connectionString) throws Exception {
// The connection string properties is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus
// connection string.
ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");
// We create a scope here so we can use the same set of local variables cleanly
// again to show the receive side separately with minimal clutter.
{
// Create connection
Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
// Create session, no transaction, client ack
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create producer
MessageProducer producer = session.createProducer(queue);
// Send messages
for (int i = 0; i < totalSend; i++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(String.valueOf(i).getBytes());
producer.send(message);
System.out.printf("Sent message %d.\n", i + 1);
}
producer.close();
session.close();
connection.stop();
connection.close();
}
{
// Create connection
Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
connection.start();
// Create session, no transaction, client ack
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create consumer
MessageConsumer consumer = session.createConsumer(queue);
// Create a listener callback to receive the messages
consumer.setMessageListener(message -> {
try {
// Received message is passed to callback
System.out.printf("Received message %d with sq#: %s\n",
totalReceived.incrementAndGet(), // increments the tracking counter
message.getJMSMessageID());
message.acknowledge();
} catch (Exception e) {
logger.error(e);
}
});
// Wait on the main thread until all sent messages have been received
while (totalReceived.get() < totalSend) {
Thread.sleep(1000);
}
consumer.close();
session.close();
connection.stop();
connection.close();
}
System.out.printf("Received all messages, exiting the sample.\n");
System.out.printf("Closing queue client.\n");
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
JmsQueueQuickstart app = new JmsQueueQuickstart();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// Parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// Get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
}
Kör appen
Skicka Anslut ionssträngen från principerna för delad åtkomst för att köra programmet. Följande utdata är av formuläret som kör programmet:
> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"
Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.
AMQP-borttagning och Service Bus-åtgärdsmappning
Så här översätter en AMQP-borttagning till en Service Bus-åtgärd:
ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()
JMS-ämnen jämfört med Service Bus-ämnen
Användning av Service Bus-ämnen och -prenumerationer via JMS-API:et ger grundläggande funktioner för att skicka och ta emot. Det är ett bekvämt val när du porterar program från andra meddelandeköer med JMS-kompatibla API:er, även om Service Bus-ämnen skiljer sig från JMS-ämnen och kräver några justeringar.
Service Bus-ämnen dirigerar meddelanden till namngivna, delade och hållbara prenumerationer som hanteras via Azure Resource Management-gränssnittet, Azure-kommandoradsverktygen eller Azure-portalen. Varje prenumeration tillåter upp till 2 000 urvalsregler, som var och en kan ha ett filtervillkor och, för SQL-filter, även en metadatatransformeringsåtgärd. Varje filtervillkorsmatchning väljer det indatameddelande som ska kopieras till prenumerationen.
Att ta emot meddelanden från prenumerationer är identiskt med att ta emot meddelanden från köer. Varje prenumeration har en associerad kö med obeställbara meddelanden och möjligheten att automatiskt vidarebefordra meddelanden till en annan kö eller ett annat ämne.
JMS-ämnen gör det möjligt för klienter att dynamiskt skapa icke-varaktiga prenumeranter som kan tillåta filtrering av meddelanden med meddelandeväljare. Dessa odelade entiteter stöds inte av Service Bus. SQL-filterregelsyntaxen för Service Bus liknar syntaxen för meddelandeväljaren som stöds av JMS.
JMS-ämnesutgivarsidan är kompatibel med Service Bus, vilket visas i det här exemplet, men dynamiska prenumeranter är det inte. Följande topologirelaterade JMS-API:er stöds inte med Service Bus.
Funktioner och begränsningar som inte stöds
Följande begränsningar finns när du använder JMS via AMQP 1.0 med Service Bus, nämligen:
- Endast ett MessageProducer- eller MessageConsumer-objekt tillåts per session. Om du behöver skapa flera MessageProducer - eller MessageConsumer-objekt i ett program skapar du en dedikerad session för var och en av dem.
- Prenumerationer på flyktiga ämnen stöds inte för närvarande.
- MessageSelector-objekt stöds inte för närvarande.
- Distribuerade transaktioner stöds inte, men transacted-sessioner stöds.
Service Bus delar upp kontrollplanet från dataplanet så att det inte stöder flera av JMS dynamiska topologifunktioner.
Metod som inte stöds | Replace with |
---|---|
createDurableSubscriber | Skapa en ämnesprenumeration som portar meddelandeväljaren. |
createDurableConsumer | Skapa en ämnesprenumeration som portar meddelandeväljaren. |
createSharedConsumer | Service Bus-ämnen kan alltid delas. Se avsnittet "JMS-ämnen jämfört med Service Bus-ämnen". |
createSharedDurableConsumer | Service Bus-ämnen kan alltid delas. Se avsnittet "JMS-ämnen jämfört med Service Bus-ämnen". |
createTemporaryTopic | Skapa ett ämne via hanterings-API:et, verktygen eller portalen med AutoDeleteOnIdle inställt på en förfalloperiod. |
createTopic | Skapa ett ämne via hanterings-API:et, verktygen eller portalen. |
Unsubscribe | Ta bort ämneshanterings-API:et, verktygen eller portalen. |
createBrowser | Stöds inte. Använd Funktionen Peek() i Service Bus-API:et. |
createQueue | Skapa en kö via hanterings-API:et, verktygen eller portalen. |
createTemporaryQueue | Skapa en kö via hanterings-API:et, verktygen eller portalen med AutoDeleteOnIdle inställt på en förfalloperiod. |
receiveNoWait | Använd metoden receive() som tillhandahålls av Service Bus SDK och ange en mycket låg eller noll timeout. |
Sammanfattning
Den här artikeln visar hur du använder service bus-asynkrona meddelandefunktioner, till exempel köer och publicera eller prenumerera på ämnen, från Java med hjälp av det populära JMS-API:et och AMQP 1.0.
Du kan också använda Service Bus AMQP 1.0 från andra språk, till exempel .NET, C, Python och PHP. Komponenter som skapats med hjälp av dessa olika språk kan utbyta meddelanden på ett tillförlitligt och tillförlitligt sätt med hjälp av AMQP 1.0-stödet i Service Bus.