Condividi tramite


Usare Java Message Service 1.1 con bus di servizio di Azure standard e AMQP 1.0

Avviso

Questo articolo è destinato al supporto limitato per l'API Java Message Service (JMS) 1.1 ed esiste solo per il livello standard bus di servizio di Azure.

Il supporto completo per l'API Java Message Service 2.0 è disponibile solo nel livello premium bus di servizio di Azure. È consigliabile usare questo livello.

Questo articolo illustra come usare le funzionalità di messaggistica bus di servizio dalle applicazioni Java usando lo standard dell'API JMS più diffuso. Queste funzionalità di messaggistica includono code e pubblicazione o sottoscrizione di argomenti. Un articolo complementare illustra come eseguire la stessa operazione usando l'API .NET bus di servizio di Azure. È possibile usare questi due articoli insieme per informazioni sulla messaggistica multipiattaforma usando advanced message queuing Protocol (AMQP) 1.0.

AMQP 1.0 è un protocollo di messaggistica efficiente, affidabile e a livello di rete che è possibile usare per creare applicazioni di messaggistica multipiattaforma affidabili.

Il supporto per AMQP 1.0 in bus di servizio significa che è possibile usare le funzionalità di accodamento e pubblicazione o sottoscrizione della messaggistica negoziata da un'ampia gamma di piattaforme usando un protocollo binario efficiente. È anche possibile creare applicazioni composte da componenti compilati usando una combinazione di linguaggi, framework e sistemi operativi.

Introduzione al bus di servizio

Questo articolo presuppone che sia già disponibile uno spazio dei nomi bus di servizio che contiene una coda denominata basicqueue. In caso contrario, è possibile creare lo spazio dei nomi e la coda usando il portale di Azure. Per altre informazioni su come creare spazi dei nomi e code del bus di servizio, vedere Introduzione alle code del bus di servizio.

Nota

Le code e gli argomenti partizionati supportano anche AMQP. Per altre informazioni, vedere le entità di messaggistica partizionate e Supporto di AMQP 1.0 per code e argomenti partizionati del bus di servizio.

Scaricare la libreria client JMS di AMQP 1.0

Per informazioni su dove scaricare la versione più recente della libreria client apache Qpid JMS AMQP 1.0, vedere il sito di download di Apache Qpid.

È necessario aggiungere i file JAR seguenti dall'archivio di distribuzione apache Qpid JMS AMQP 1.0 alla variabile di ambiente Java CLASSPATH quando si compilano ed eseguono applicazioni JMS con bus di servizio:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[versione].jar

Nota

I nomi e le versioni JAR di JMS potrebbero essere stati modificati. Per altre informazioni, vedere Qpid JMS AMQP 1.0.

Codice di applicazioni Java

Interfaccia di denominazione e directory Java

JMS usa l'interfaccia JNDI (Java Naming and Directory Interface) per creare una separazione tra i nomi logici e i nomi fisici. Due tipi di oggetti JMS vengono risolti tramite JNDI: Connessione ionFactory e Destination. JNDI utilizza un modello di provider in cui è possibile inserire diversi servizi directory per gestire le attività di risoluzione dei nomi. La libreria Apache Qpid JMS AMQP 1.0 include un semplice provider JNDI basato su file di proprietà configurato usando un file di proprietà del formato seguente:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

Configurare il contesto JNDI e configurare l'oggetto Connessione ionFactory

Il stringa di connessione a cui si fa riferimento è quello disponibile nei criteri di accesso condiviso nella portale di Azure in Stringa di Connessione primaria.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Configurare le code di destinazione producer e consumer

La voce utilizzata per definire una destinazione nel provider JNDI del file di proprietà Qpid è del formato seguente.

Per creare una coda di destinazione per il producer:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

Per creare una coda di destinazione per il consumer:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

Scrivere l'applicazione JMS

Quando si usa JMS con bus di servizio, non sono necessarie API o opzioni speciali. Esistono alcune restrizioni che verranno trattate in un secondo momento. Come per qualsiasi applicazione JMS, la prima cosa necessaria è la configurazione dell'ambiente JNDI per poter risolvere un oggetto e destinazioni Connessione ionFactory.

Configurare l'oggetto InitialContext JNDI

L'ambiente JNDI viene configurato passando una tabella hash delle informazioni di configurazione nel costruttore della classe javax.naming.InitialContext. I due elementi necessari nella tabella hash sono il nome della classe initial context factory e l'URL del provider. Il codice seguente illustra come configurare l'ambiente JNDI per l'uso del provider JNDI basato su file delle proprietà Qpid con un file di proprietà denominato servicebus.properties.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Una semplice applicazione JMS che usa una coda bus di servizio

Il programma di esempio seguente invia messaggi di testo JMS a una coda bus di servizio con il nome logico JNDI QUEUE e riceve nuovamente i messaggi.

È possibile accedere a tutte le informazioni sul codice sorgente e sulla configurazione dalla guida introduttiva alla coda JMS degli esempi di bus di servizio di Azure.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

Eseguire l'applicazione

Passare il valore di Stringa di connessione in Criteri di accesso condiviso per eseguire l'applicazione. L'output seguente è il modulo che esegue l'applicazione:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

Mapping della disposizione AMQP e dell'operazione del bus di servizio

Ecco come si traduce un'eliminazione AMQP in un'operazione di bus di servizio:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

Argomenti relativi a JMS e argomenti di bus di servizio

L'uso di bus di servizio argomenti e sottoscrizioni tramite l'API JMS offre funzionalità di invio e ricezione di base. È una scelta comoda quando si esegue la conversione di applicazioni da altri broker di messaggi con API conformi a JMS, anche se bus di servizio argomenti differiscono dagli argomenti JMS e richiedono alcune modifiche.

bus di servizio argomenti instradano i messaggi in sottoscrizioni denominate, condivise e durevoli gestite tramite l'interfaccia di Gestione risorse di Azure, gli strumenti da riga di comando di Azure o il portale di Azure. Ogni sottoscrizione consente fino a 2.000 regole di selezione, ognuna delle quali può avere una condizione di filtro e, per i filtri SQL, anche un'azione di trasformazione dei metadati. Ogni condizione di filtro seleziona il messaggio di input da copiare nella sottoscrizione.

La ricezione di messaggi dalle sottoscrizioni è identica alla ricezione di messaggi dalle code. Ogni sottoscrizione ha una coda di messaggi non recapitabili associata e la possibilità di inoltrare automaticamente i messaggi a un'altra coda o argomenti.

Gli argomenti JMS consentono ai client di creare in modo dinamico sottoscrittori non permanenti e durevoli che, facoltativamente, consentono di filtrare i messaggi con selettori di messaggi. Queste entità non condivise non sono supportate da bus di servizio. La sintassi della regola di filtro SQL per bus di servizio è simile alla sintassi del selettore di messaggi supportata da JMS.

Il lato editore dell'argomento JMS è compatibile con bus di servizio, come illustrato in questo esempio, ma i sottoscrittori dinamici non lo sono. Le API JMS correlate alla topologia seguenti non sono supportate con bus di servizio.

Funzionalità non supportate e restrizioni

Quando si usa JMS su AMQP 1.0 con bus di servizio, esistono le restrizioni seguenti:

  • Per sessione è consentito un solo oggetto MessageProducer o MessageConsumer . Se è necessario creare più oggetti MessageProducer o MessageConsumer in un'applicazione, creare una sessione dedicata per ognuna di esse.
  • Le sottoscrizioni di argomenti volatili non sono attualmente supportate.
  • Gli oggetti MessageSelector non sono attualmente supportati.
  • Le transazioni distribuite non sono supportate, ma le sessioni transazionate sono supportate.

bus di servizio suddivide il piano di controllo dal piano dati, quindi non supporta diverse funzioni di topologia dinamica di JMS.

Metodo non supportato Replace with
createDurableSubscriber Creare una sottoscrizione di argomento che porti il selettore di messaggi.
createDurableConsumer Creare una sottoscrizione di argomento che porti il selettore di messaggi.
createSharedConsumer bus di servizio argomenti sono sempre condivisibili. Vedere la sezione "Argomenti di JMS e bus di servizio argomenti".
createSharedDurableConsumer bus di servizio argomenti sono sempre condivisibili. Vedere la sezione "Argomenti di JMS e bus di servizio argomenti".
createTemporaryTopic Creare un argomento tramite l'API di gestione, gli strumenti o il portale con AutoDeleteOnIdle impostato su un periodo di scadenza.
createTopic Creare un argomento tramite l'API di gestione, gli strumenti o il portale.
unsubscribe Eliminare l'API di gestione degli argomenti, gli strumenti o il portale.
createBrowser Non supportato. Usare la funzionalità Peek() dell'API bus di servizio.
createQueue Creare una coda tramite l'API di gestione, gli strumenti o il portale.
createTemporaryQueue Creare una coda tramite l'API di gestione, gli strumenti o il portale con AutoDeleteOnIdle impostato su un periodo di scadenza.
receiveNoWait Usare il metodo receive() fornito dall'SDK bus di servizio e specificare un timeout molto basso o zero.

Riepilogo

Questo articolo illustra come usare bus di servizio funzionalità di messaggistica negoziata, ad esempio code e pubblicare o sottoscrivere argomenti, da Java usando l'API JMS più diffusa e AMQP 1.0.

È anche possibile usare bus di servizio AMQP 1.0 da altri linguaggi, ad esempio .NET, C, Python e PHP. I componenti creati usando questi linguaggi diversi possono scambiare messaggi in modo affidabile e a massima fedeltà usando il supporto AMQP 1.0 in bus di servizio.

Passaggi successivi