Dela via


Använda Java Message Service 1.1 med Azure Service Bus Standard och AMQP 1.0

Varning

Den här artikeln riktar sig till begränsat stöd för Java Message Service (JMS) 1.1 API och finns endast för Azure Service Bus-standardnivån.

Fullständigt stöd för Java Message Service 2.0-API:et är endast tillgängligt på Azure Service Bus Premium-nivån. Vi rekommenderar att du använder den här nivån.

Den här artikeln beskriver hur du använder Service Bus-meddelandefunktioner från Java-program med hjälp av den populära JMS API-standarden. Dessa meddelandefunktioner omfattar köer och publicering eller prenumeration på ämnen. En tillhörande artikel beskriver hur du gör samma sak med hjälp av Azure Service Bus .NET API. Du kan använda dessa två artiklar tillsammans för att lära dig mer om plattformsoberoende meddelanden med hjälp av ADVANCED Message Queuing Protocol (AMQP) 1.0.

AMQP 1.0 är ett effektivt, tillförlitligt meddelandeprotokoll på trådnivå som du kan använda för att skapa robusta, plattformsoberoende meddelandeprogram.

Stöd för AMQP 1.0 i Service Bus innebär att du kan använda funktionerna för köer och publicera eller prenumerera på asynkrona meddelanden från en rad olika plattformar med hjälp av ett effektivt binärt protokoll. Du kan också skapa program som består av komponenter som skapats med hjälp av en blandning av språk, ramverk och operativsystem.

Kom igång med Service Bus

Den här artikeln förutsätter att du redan har ett Service Bus-namnområde som innehåller en kö med namnet basicqueue. Om du inte gör det kan du skapa namnområdet och kön med hjälp av Azure-portalen. Mer information om hur du skapar Service Bus-namnområden och köer finns i Komma igång med Service Bus-köer.

Kommentar

Partitionerade köer och ämnen stöder även AMQP. Mer information finns i Partitionerade meddelandeentiteter och AMQP 1.0-stöd för Service Bus-partitionerade köer och ämnen.

Ladda ned AMQP 1.0 JMS-klientbiblioteket

Information om var du kan ladda ned den senaste versionen av Apache Qpid JMS AMQP 1.0-klientbiblioteket finns på nedladdningswebbplatsen för Apache Qpid.

Du måste lägga till följande JAR-filer från Apache Qpid JMS AMQP 1.0-distributionsarkivet i Java CLASSPATH-miljövariabeln när du skapar och kör JMS-program med Service Bus:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jar

Kommentar

JMS JAR-namn och versioner kan ha ändrats. Mer information finns i Qpid JMS AMQP 1.0.

Koda Java-program

Java-namngivning och kataloggränssnitt

JMS använder Java Naming and Directory Interface (JNDI) för att skapa en separation mellan logiska namn och fysiska namn. Två typer av JMS-objekt löses med hjälp av JNDI: Anslut ionFactory och Destination. JNDI använder en providermodell där du kan ansluta olika katalogtjänster för att hantera namnmatchningsuppgifter. Apache Qpid JMS AMQP 1.0-biblioteket levereras med en enkel egenskapsfilbaserad JNDI-provider som konfigureras med hjälp av en egenskapsfil med följande format:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

Konfigurera JNDI-kontext och konfigurera objektet Anslut ionFactory

Den anslutningssträng refereras till är den som är tillgänglig i principerna för delad åtkomst i Azure-portalen under Primär Anslut ionssträng.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Konfigurera målköer för producent och konsument

Posten som används för att definiera ett mål i Qpid-egenskapsfilens JNDI-provider har följande format.

Så här skapar du en målkö för producenten:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

Så här skapar du en målkö för konsumenten:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

Skriva JMS-programmet

Inga särskilda API:er eller alternativ krävs när du använder JMS med Service Bus. Det finns några begränsningar som kommer att omfattas senare. Precis som med alla JMS-program är det första som krävs konfigurationen av JNDI-miljön för att kunna lösa ett Anslut ionFactory-objekt och mål.

Konfigurera JNDI InitialContext-objektet

JNDI-miljön konfigureras genom att skicka en hash-tabell med konfigurationsinformation till konstruktorn för klassen javax.naming.InitialContext. De två obligatoriska elementen i hash-tabellen är klassnamnet för den inledande kontextfabriken och providerns URL. Följande kod visar hur du konfigurerar JNDI-miljön så att den använder den filbaserade JNDI-providern för Qpid-egenskaper med en egenskapsfil med namnet servicebus.properties.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Ett enkelt JMS-program som använder en Service Bus-kö

Följande exempelprogram skickar JMS-textmeddelanden till en Service Bus-kö med det logiska JNDI-namnet QUEUE och tar emot meddelandena tillbaka.

Du kan komma åt all källkod och konfigurationsinformation från snabbstarten för Azure Service Bus-exempel-JMS-kön.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

Kör appen

Skicka Anslut ionssträngen från principerna för delad åtkomst för att köra programmet. Följande utdata är av formuläret som kör programmet:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

AMQP-borttagning och Service Bus-åtgärdsmappning

Så här översätter en AMQP-borttagning till en Service Bus-åtgärd:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

JMS-ämnen jämfört med Service Bus-ämnen

Användning av Service Bus-ämnen och -prenumerationer via JMS-API:et ger grundläggande funktioner för att skicka och ta emot. Det är ett bekvämt val när du porterar program från andra meddelandeköer med JMS-kompatibla API:er, även om Service Bus-ämnen skiljer sig från JMS-ämnen och kräver några justeringar.

Service Bus-ämnen dirigerar meddelanden till namngivna, delade och hållbara prenumerationer som hanteras via Azure Resource Management-gränssnittet, Azure-kommandoradsverktygen eller Azure-portalen. Varje prenumeration tillåter upp till 2 000 urvalsregler, som var och en kan ha ett filtervillkor och, för SQL-filter, även en metadatatransformeringsåtgärd. Varje filtervillkorsmatchning väljer det indatameddelande som ska kopieras till prenumerationen.

Att ta emot meddelanden från prenumerationer är identiskt med att ta emot meddelanden från köer. Varje prenumeration har en associerad kö med obeställbara meddelanden och möjligheten att automatiskt vidarebefordra meddelanden till en annan kö eller ett annat ämne.

JMS-ämnen gör det möjligt för klienter att dynamiskt skapa icke-varaktiga prenumeranter som kan tillåta filtrering av meddelanden med meddelandeväljare. Dessa odelade entiteter stöds inte av Service Bus. SQL-filterregelsyntaxen för Service Bus liknar syntaxen för meddelandeväljaren som stöds av JMS.

JMS-ämnesutgivarsidan är kompatibel med Service Bus, vilket visas i det här exemplet, men dynamiska prenumeranter är det inte. Följande topologirelaterade JMS-API:er stöds inte med Service Bus.

Funktioner och begränsningar som inte stöds

Följande begränsningar finns när du använder JMS via AMQP 1.0 med Service Bus, nämligen:

  • Endast ett MessageProducer- eller MessageConsumer-objekt tillåts per session. Om du behöver skapa flera MessageProducer - eller MessageConsumer-objekt i ett program skapar du en dedikerad session för var och en av dem.
  • Prenumerationer på flyktiga ämnen stöds inte för närvarande.
  • MessageSelector-objekt stöds inte för närvarande.
  • Distribuerade transaktioner stöds inte, men transacted-sessioner stöds.

Service Bus delar upp kontrollplanet från dataplanet så att det inte stöder flera av JMS dynamiska topologifunktioner.

Metod som inte stöds Replace with
createDurableSubscriber Skapa en ämnesprenumeration som portar meddelandeväljaren.
createDurableConsumer Skapa en ämnesprenumeration som portar meddelandeväljaren.
createSharedConsumer Service Bus-ämnen kan alltid delas. Se avsnittet "JMS-ämnen jämfört med Service Bus-ämnen".
createSharedDurableConsumer Service Bus-ämnen kan alltid delas. Se avsnittet "JMS-ämnen jämfört med Service Bus-ämnen".
createTemporaryTopic Skapa ett ämne via hanterings-API:et, verktygen eller portalen med AutoDeleteOnIdle inställt på en förfalloperiod.
createTopic Skapa ett ämne via hanterings-API:et, verktygen eller portalen.
Unsubscribe Ta bort ämneshanterings-API:et, verktygen eller portalen.
createBrowser Stöds inte. Använd Funktionen Peek() i Service Bus-API:et.
createQueue Skapa en kö via hanterings-API:et, verktygen eller portalen.
createTemporaryQueue Skapa en kö via hanterings-API:et, verktygen eller portalen med AutoDeleteOnIdle inställt på en förfalloperiod.
receiveNoWait Använd metoden receive() som tillhandahålls av Service Bus SDK och ange en mycket låg eller noll timeout.

Sammanfattning

Den här artikeln visar hur du använder service bus-asynkrona meddelandefunktioner, till exempel köer och publicera eller prenumerera på ämnen, från Java med hjälp av det populära JMS-API:et och AMQP 1.0.

Du kan också använda Service Bus AMQP 1.0 från andra språk, till exempel .NET, C, Python och PHP. Komponenter som skapats med hjälp av dessa olika språk kan utbyta meddelanden på ett tillförlitligt och tillförlitligt sätt med hjälp av AMQP 1.0-stödet i Service Bus.

Nästa steg