Επεξεργασία

Κοινή χρήση μέσω


AMQP 1.0 in Azure Service Bus and Event Hubs protocol guide

The Advanced Message Queueing Protocol 1.0 is a standardized framing and transfer protocol for asynchronously, securely, and reliably transferring messages between two parties. It's the primary protocol of Azure Service Bus Messaging and Azure Event Hubs.

AMQP 1.0 is the result of broad industry collaboration that brought together middleware vendors, such as Microsoft and Red Hat, with many messaging middleware users such as JP Morgan Chase representing the financial services industry. The technical standardization forum for the AMQP protocol and extension specifications is OASIS, and it has achieved formal approval as an international standard as ISO/IEC 19494:2014.

Goals

This article summarizes the core concepts of the AMQP 1.0 messaging specification along extension specifications developed by the OASIS AMQP Technical Committee and explains how Azure Service Bus implements and builds on these specifications.

The goal is for any developer using any existing AMQP 1.0 client stack on any platform to be able to interact with Azure Service Bus via AMQP 1.0.

Common general-purpose AMQP 1.0 stacks, such as Apache Qpid Proton or AMQP.NET Lite, implement all core AMQP 1.0 protocol elements like sessions or links. Those foundational elements are sometimes wrapped with a higher-level API; Apache Proton even offers two, the imperative Messenger API and the reactive Reactor API.

In the following discussion, we assume that the management of AMQP connections, sessions, and links and the handling of frame transfers and flow control are handled by the respective stack (such as Apache Proton-C) and don't require much if any specific attention from application developers. We abstractly assume the existence of a few API primitives like the ability to connect, and to create some form of sender and receiver abstraction objects, which then have some shape of send() and receive() operations, respectively.

When discussing advanced capabilities of Azure Service Bus, such as message browsing or management of sessions, those features are explained in AMQP terms, but also as a layered pseudo-implementation on top of this assumed API abstraction.

What is AMQP?

AMQP is a framing and transfer protocol. Framing means that it provides structure for binary data streams that flow in either direction of a network connection. The structure provides delineation for distinct blocks of data, called frames, to be exchanged between the connected parties. The transfer capabilities make sure that both communicating parties can establish a shared understanding about when frames shall be transferred, and when transfers shall be considered complete.

Unlike earlier expired draft versions from the AMQP working group that are still in use by a few message brokers, the working group's final, and standardized AMQP 1.0 protocol doesn't prescribe the presence of a message broker or any particular topology for entities inside a message broker.

The protocol can be used for symmetric peer-to-peer communication, for interaction with message brokers that support queues and publish/subscribe entities, as Azure Service Bus does. It can also be used for interaction with messaging infrastructure where the interaction patterns are different from regular queues, as is the case with Azure Event Hubs. An event hub acts like a queue when events are sent to it, but acts more like a serial storage service when events are read from it; it somewhat resembles a tape drive. The client picks an offset into the available data stream and is then served all events from that offset to the latest available.

The AMQP 1.0 protocol is designed to be extensible, enabling further specifications to enhance its capabilities. The three extension specifications discussed in this document illustrate it. For communication over existing HTTPS/WebSockets infrastructure, configuring the native AMQP TCP ports might be difficult. A binding specification defines how to layer AMQP over WebSockets. For interacting with the messaging infrastructure in a request/response fashion for management purposes or to provide advanced functionality, the AMQP management specification defines the required basic interaction primitives. For federated authorization model integration, the AMQP claims-based-security specification defines how to associate and renew authorization tokens associated with links.

Basic AMQP scenarios

This section explains the basic usage of AMQP 1.0 with Azure Service Bus, which includes creating connections, sessions, and links, and transferring messages to and from Service Bus entities such as queues, topics, and subscriptions.

The most authoritative source to learn about how AMQP works is the AMQP 1.0 specification, but the specification was written to precisely guide implementation and not to teach the protocol. This section focuses on introducing as much terminology as needed for describing how Service Bus uses AMQP 1.0. For a more comprehensive introduction to AMQP and a broader discussion of AMQP 1.0, review this video course.

Connections and sessions

AMQP calls the communicating programs containers; those contain nodes, which are the communicating entities inside of those containers. A queue can be such a node. AMQP allows for multiplexing, so a single connection can be used for many communication paths between nodes; for example, an application client can concurrently receive from one queue and send to another queue over the same network connection.

Diagram showing Sessions and Connections between containers.

The network connection is thus anchored on the container. It's initiated by the container in the client role making an outbound TCP socket connection to a container in the receiver role, which listens for and accepts inbound TCP connections. The connection handshake includes negotiating the protocol version, declaring or negotiating the use of Transport Level Security (TLS)/Secure Sockets Layer (SSL), and an authentication/authorization handshake at the connection scope that is based on SASL.

Azure Service Bus or Azure Event Hubs requires the use of TLS at all times. It supports connections over TCP port 5671, whereby the TCP connection is first overlaid with TLS before entering the AMQP protocol handshake, and also supports connections over TCP port 5672 whereby the server immediately offers a mandatory upgrade of connection to TLS using the AMQP-prescribed model. The AMQP WebSockets binding creates a tunnel over TCP port 443 that is then equivalent to AMQP 5671 connections.

After setting up the connection and TLS, Service Bus offers two SASL mechanism options:

  • SASL PLAIN is commonly used for passing username and password credentials to a server. Service Bus doesn't have accounts, but named Shared Access Security rules, which confer rights and are associated with a key. The name of a rule is used as the user name and the key (as base64 encoded text) is used as the password. The rights associated with the chosen rule govern the operations allowed on the connection.
  • SASL ANONYMOUS is used for bypassing SASL authorization when the client wants to use the claims-based-security (CBS) model that is described later. With this option, a client connection can be established anonymously for a short time during which the client can only interact with the CBS endpoint and the CBS handshake must complete.

After the transport connection is established, the containers each declare the maximum frame size they're willing to handle, and after an idle timeout they’ll unilaterally disconnect if there's no activity on the connection.

They also declare how many concurrent channels are supported. A channel is a unidirectional, outbound, virtual transfer path on top of the connection. A session takes a channel from each of the interconnected containers to form a bi-directional communication path.

Sessions have a window-based flow control model; when a session is created, each party declares how many frames it's willing to accept into its receive window. As the parties exchange frames, transferred frames fill that window and transfers stop when the window is full and until the window gets reset or expanded using the flow performative (performative is the AMQP term for protocol-level gestures exchanged between the two parties).

This window-based model is roughly analogous to the TCP concept of window-based flow control, but at the session level inside the socket. The protocol’s concept of allowing for multiple concurrent sessions exists so that high priority traffic could be rushed past throttled normal traffic, like on a highway express lane.

Azure Service Bus currently uses exactly one session for each connection. The Service Bus maximum frame-size is 262,144 bytes (256-K bytes) for Service Bus Standard. It's 1048576 (100 MB) for Service Bus Premium and Event Hubs. Service Bus doesn't impose any particular session-level throttling windows, but resets the window regularly as part of link-level flow control (see the next section).

Connections, channels, and sessions are ephemeral. If the underlying connection collapses, connections, TLS tunnel, SASL authorization context, and sessions must be reestablished.

AMQP outbound port requirements

Clients that use AMQP connections over TCP require ports 5671 and 5672 to be opened in the local firewall. Along with these ports, it might be necessary to open additional ports if the EnableLinkRedirect feature is enabled. EnableLinkRedirect is a new messaging feature that helps skip one-hop while receiving messages, thus helping to boost throughput. The client would start communicating directly with the back-end service over port range 104XX as shown in the following image.

List of destination ports

A .NET client would fail with a SocketException ("An attempt was made to access a socket in a way forbidden by its access permissions") if these ports are blocked by the firewall. The feature can be disabled by setting EnableAmqpLinkRedirect=false in the connection string, which forces the clients to communicate with the remote service over port 5671.

The AMQP WebSocket binding provides a mechanism for tunneling an AMQP connection over a WebSocket transport. This binding creates a tunnel over the TCP port 443, which is equivalent to AMQP 5671 connections. Use AMQP WebSockets if you are behind a firewall that blocks TCP connections over ports 5671, 5672 but allows TCP connections over port 443 (https).

AMQP transfers messages over links. A link is a communication path created over a session that enables transferring messages in one direction; the transfer status negotiation is over the link and bi-directional between the connected parties.

Screenshot showing a Session carrying a link connection between two containers.

Links can be created by either container at any time and over an existing session, which makes AMQP different from many other protocols, including HTTP and MQTT, where the initiation of transfers and transfer path is an exclusive privilege of the party creating the socket connection.

The link-initiating container asks the opposite container to accept a link and it chooses a role of either sender or receiver. Therefore, either container can initiate creating unidirectional or bi-directional communication paths, with the latter modeled as pairs of links.

Links are named and associated with nodes. As stated in the beginning, nodes are the communicating entities inside a container.

In Service Bus, a node is directly equivalent to a queue, a topic, a subscription, or a deadletter subqueue of a queue or subscription. The node name used in AMQP is therefore the relative name of the entity inside of the Service Bus namespace. If a queue is named myqueue, that’s also its AMQP node name. A topic subscription follows the HTTP API convention by being sorted into a "subscriptions" resource collection and thus, a subscription sub on a topic mytopic has the AMQP node name mytopic/subscriptions/sub.

The connecting client is also required to use a local node name for creating links; Service Bus isn't prescriptive about those node names and doesn't interpret them. AMQP 1.0 client stacks generally use a scheme to assure that these ephemeral node names are unique in the scope of the client.

Transfers

Once a link has been established, messages can be transferred over that link. In AMQP, a transfer is executed with an explicit protocol gesture (the transfer performative) that moves a message from sender to receiver over a link. A transfer is complete when it is “settled”, meaning that both parties have established a shared understanding of the outcome of that transfer.

A diagram showing a message's transfer between the Sender and Receiver and disposition that results from it.

In the simplest case, the sender can choose to send messages "pre-settled," meaning that the client isn’t interested in the outcome and the receiver doesn't provide any feedback about the outcome of the operation. This mode is supported by Service Bus at the AMQP protocol level, but not exposed in any of the client APIs.

The regular case is that messages are being sent unsettled, and the receiver then indicates acceptance or rejection using the disposition performative. Rejection occurs when the receiver can't accept the message for any reason, and the rejection message contains information about the reason, which is an error structure defined by AMQP. If messages are rejected due to internal errors inside of Service Bus, the service returns extra information inside that structure that can be used for providing diagnostics hints to support personnel if you're filing support requests. You learn more details about errors later.

A special form of rejection is the released state, which indicates that the receiver has no technical objection to the transfer, but also no interest in settling the transfer. That case exists, for example, when a message is delivered to a Service Bus client, and the client chooses to "abandon" the message because it can't perform the work resulting from processing the message; the message delivery itself isn't at fault. A variation of that state is the modified state, which allows changes to the message as it is released. That state isn't used by Service Bus at present.

The AMQP 1.0 specification defines a further disposition state called received, that specifically helps to handle link recovery. Link recovery allows reconstituting the state of a link and any pending deliveries on top of a new connection and session, when the prior connection and session were lost.

Service Bus doesn't support link recovery; if the client loses the connection to Service Bus with an unsettled message transfer pending, that message transfer is lost, and the client must reconnect, reestablish the link, and retry the transfer.

As such, Service Bus and Event Hubs support "at least once" transfer where the sender can be assured for the message having been stored and accepted, but don't support "exactly once" transfers at the AMQP level, where the system would attempt to recover the link and continue to negotiate the delivery state to avoid duplication of the message transfer.

To compensate for possible duplicate sends, Service Bus supports duplicate detection as an optional feature on queues and topics. Duplicate detection records the message IDs of all incoming messages during a user-defined time window, then silently drops all messages sent with the same message-IDs during that same window.

Flow control

In addition to the session-level flow control model that previously discussed, each link has its own flow control model. Session-level flow control protects the container from having to handle too many frames at once, link-level flow control puts the application in charge of how many messages it wants to handle from a link and when.

Screenshot of a log showing Source, Destination, Source Port, Destination Port, and Protocol Name. In the first row the Destination Port 10401 (0x28 A 1) is outlined in black.

On a link, transfers can only happen when the sender has enough link credit. Link credit is a counter set by the receiver using the flow performative, which is scoped to a link. When the sender is assigned link credit, it attempts to use up that credit by delivering messages. Each message delivery decrements the remaining link credit by 1. When the link credit is used, deliveries stop.

When Service Bus is in the receiver role, it instantly provides the sender with ample link credit, so that messages can be sent immediately. As link credit is used, Service Bus occasionally sends a flow performative to the sender to update the link credit balance.

In the sender role, Service Bus sends messages to use up any outstanding link credit.

A "receive" call at the API level translates into a flow performative being sent to Service Bus by the client, and Service Bus consumes that credit by taking the first available, unlocked message from the queue, locking it, and transferring it. If there's no message readily available for delivery, any outstanding credit by any link established with that particular entity remains recorded in order of arrival, and messages are locked and transferred as they become available, to use any outstanding credit.

The lock on a message is released when the transfer is settled into one of the terminal states accepted, rejected, or released. The message is removed from Service Bus when the terminal state is accepted. It remains in Service Bus and is delivered to the next receiver when the transfer reaches any of the other states. Service Bus automatically moves the message into the entity's deadletter queue when it reaches the maximum delivery count allowed for the entity due to repeated rejections or releases.

Even though the Service Bus APIs don't directly expose such an option today, a lower-level AMQP protocol client can use the link-credit model to turn the "pull-style" interaction of issuing one unit of credit for each receive request into a "push-style" model by issuing a large number of link credits and then receive messages as they become available without any further interaction. Push is supported through the ServiceBusProcessor.PrefetchCount or ServiceBusReceiver.PrefetchCount property settings. When they're non-zero, the AMQP client uses it as the link credit.

In this context, it's important to understand that the clock for the expiration of the lock on the message inside the entity starts when the message is taken from the entity, not when the message is put on the wire. Whenever the client indicates readiness to receive messages by issuing link credit, it's therefore expected to be actively pulling messages across the network and be ready to handle them. Otherwise the message lock might have expired before the message is even delivered. The use of link-credit flow control should directly reflect the immediate readiness to deal with available messages dispatched to the receiver.

In summary, the following sections provide a schematic overview of the performative flow during different API interactions. Each section describes a different logical operation. Some of those interactions might be "lazy," meaning they might only be performed when required. Creating a message sender might not cause a network interaction until the first message is sent or requested.

The arrows in the following table show the performative flow direction.

Create message receiver

Client Service Bus
--> attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**receiver**,<br/>source={entity name},<br/>target={client link ID}<br/>) Client attaches to entity as receiver
Service Bus replies attaching its end of the link <-- attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**sender**,<br/>source={entity name},<br/>target={client link ID}<br/>)

Create message sender

Client Service Bus
--> attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**sender**,<br/>source={client link ID},<br/>target={entity name}<br/>) No action
No action <-- attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**receiver**,<br/>source={client link ID},<br/>target={entity name}<br/>)

Create message sender (error)

Client Service Bus
--> attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**sender**,<br/>source={client link ID},<br/>target={entity name}<br/>) No action
No action <-- attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**receiver**,<br/>source=null,<br/>target=null<br/>)<br/><br/><-- detach(<br/>handle={numeric handle},<br/>closed=**true**,<br/>error={error info}<br/>)

Close message receiver/sender

Client Service Bus
--> detach(<br/>handle={numeric handle},<br/>closed=**true**<br/>) No action
No action <-- detach(<br/>handle={numeric handle},<br/>closed=**true**<br/>)

Send (success)

Client Service Bus
--> transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,,more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>) No action
No action <-- disposition(<br/>role=receiver,<br/>first={delivery ID},<br/>last={delivery ID},<br/>settled=**true**,<br/>state=**accepted**<br/>)

Send (error)

Client Service Bus
--> transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,,more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>) No action
No action <-- disposition(<br/>role=receiver,<br/>first={delivery ID},<br/>last={delivery ID},<br/>settled=**true**,<br/>state=**rejected**(<br/>error={error info}<br/>)<br/>)

Receive

Client Service Bus
--> flow(<br/>link-credit=1<br/>) No action
No action < transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>)
--> disposition(<br/>role=**receiver**,<br/>first={delivery ID},<br/>last={delivery ID},<br/>settled=**true**,<br/>state=**accepted**<br/>) No action

Multi-message receive

Client Service Bus
--> flow(<br/>link-credit=3<br/>) No action
No action < transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>)
No action < transfer(<br/>delivery-id={numeric handle+1},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>)
No action < transfer(<br/>delivery-id={numeric handle+2},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>)
--> disposition(<br/>role=receiver,<br/>first={delivery ID},<br/>last={delivery ID+2},<br/>settled=**true**,<br/>state=**accepted**<br/>) No action

Messages

The following sections explain which properties from the standard AMQP message sections are used by Service Bus and how they map to the Service Bus API set.

Any property that application needs to define should be mapped to AMQP's application-properties map.

Field Name Usage API name
durable - -
priority - -
ttl Time to live for this message TimeToLive
first-acquirer - -
delivery-count - DeliveryCount

Properties

Field Name Usage API name
message-id Application-defined, free-form identifier for this message. Used for duplicate detection. MessageId
user-id Application-defined user identifier, not interpreted by Service Bus. Not accessible through the Service Bus API.
to Application-defined destination identifier, not interpreted by Service Bus. To
subject Application-defined message purpose identifier, not interpreted by Service Bus. Subject
reply-to Application-defined reply-path indicator, not interpreted by Service Bus. ReplyTo
correlation-id Application-defined correlation identifier, not interpreted by Service Bus. CorrelationId
content-type Application-defined content-type indicator for the body, not interpreted by Service Bus. ContentType
content-encoding Application-defined content-encoding indicator for the body, not interpreted by Service Bus. Not accessible through the Service Bus API.
absolute-expiry-time Declares at which absolute instant the message expires. Ignored on input (header TTL is observed), authoritative on output. Not accessible through the Service Bus API.
creation-time Declares at which time the message was created. Not used by Service Bus Not accessible through the Service Bus API.
group-id Application-defined identifier for a related set of messages. Used for Service Bus sessions. SessionId
group-sequence Counter identifying the relative sequence number of the message inside a session. Ignored by Service Bus. Not accessible through the Service Bus API.
reply-to-group-id - ReplyToSessionId

Message annotations

There are few other service bus message properties, which aren't part of AMQP message properties, and are passed along as MessageAnnotations on the message.

Annotation Map Key Usage API name
x-opt-scheduled-enqueue-time Declares at which time the message should appear on the entity ScheduledEnqueueTime
x-opt-partition-key Application-defined key that dictates which partition the message should land in. PartitionKey
x-opt-via-partition-key Application-defined partition-key value when a transaction is to be used to send messages via a transfer queue. TransactionPartitionKey
x-opt-enqueued-time Service-defined UTC time representing the actual time of enqueuing the message. Ignored on input. EnqueuedTime
x-opt-sequence-number Service-defined unique number assigned to a message. SequenceNumber
x-opt-offset Service-defined enqueued sequence number of the message. EnqueuedSequenceNumber
x-opt-locked-until Service-defined. The date and time until which the message will be locked in the queue/subscription. LockedUntil
x-opt-deadletter-source Service-Defined. If the message is received from dead letter queue, it represents the source of the original message. DeadLetterSource

Transaction capability

A transaction groups two or more operations together into an execution scope. By nature, such a transaction must ensure that all operations belonging to a given group of operations either succeed or fail jointly. The operations are grouped by an identifier txn-id.

For transactional interaction, the client acts as a transaction controller , which controls the operations that should be grouped together. Service Bus Service acts as a transactional resource and performs work as requested by the transaction controller.

The client and service communicate over a control link , which is established by the client. The declare and discharge messages are sent by the controller over the control link to allocate and complete transactions respectively (they don't represent the demarcation of transactional work). The actual send/receive isn't performed on this link. Each transactional operation requested is explicitly identified with the desired txn-id and therefore might occur on any link on the Connection. If the control link is closed while there exist non-discharged transactions it created, then all such transactions are immediately rolled back, and attempts to perform further transactional work on them will lead to failure. Messages on control link must not be pre settled.

Every connection has to initiate its own control link to be able to start and end transactions. The service defines a special target that functions as a coordinator. The client/controller establishes a control link to this target. Control link is outside the boundary of an entity, that is, same control link can be used to initiate and discharge transactions for multiple entities.

Starting a transaction

To begin transactional work, the controller must obtain a txn-id from the coordinator. It does this by sending a declare type message. If the declaration is successful, the coordinator responds with a disposition outcome, which carries the assigned txn-id.

Client (Controller) Direction Service Bus (Coordinator)
attach(<br/>name={link name},<br/>... ,<br/>role=**sender**,<br/>target=**Coordinator**<br/>) ------>
<------ attach(<br/>name={link name},<br/>... ,<br/>target=Coordinator()<br/>)
transfer(<br/>delivery-id=0, ...)<br/>{ AmqpValue (**Declare()**)} ------>
<------ disposition( <br/> first=0, last=0, <br/>state=**Declared**(<br/>**txn-id**={transaction ID}<br/>))

Discharging a transaction

The controller concludes the transactional work by sending a discharge message to the coordinator. The controller indicates that it wishes to commit or roll back the transactional work by setting the fail flag on the discharge body. If the coordinator is unable to complete the discharge, the message is rejected with this outcome carrying the transaction-error.

Note: fail=true refers to Rollback of a transaction, and fail=false refers to Commit.

Client (Controller) Direction Service Bus (Coordinator)
transfer(<br/>delivery-id=0, ...)<br/>{ AmqpValue (Declare())} ------>
<------ disposition( <br/> first=0, last=0, <br/>state=Declared(<br/>txn-id={transaction ID}<br/>))
. . .
Transactional work
on other links
. . .
transfer(<br/>delivery-id=57, ...)<br/>{ AmqpValue (<br/>**Discharge(txn-id=0,<br/>fail=false)**)} ------>
<------ disposition( <br/> first=57, last=57, <br/>state=**Accepted()**)

Sending a message in a transaction

All transactional work is done with the transactional delivery state transactional-state that carries the txn-id. When sending messages, the transactional-state is carried by the message's transfer frame.

Client (Controller) Direction Service Bus (Coordinator)
transfer(<br/>delivery-id=0, ...)<br/>{ AmqpValue (Declare())} ------>
<------ disposition( <br/> first=0, last=0, <br/>state=Declared(<br/>txn-id={transaction ID}<br/>))
transfer(<br/>handle=1,<br/>delivery-id=1, <br/>**state=<br/>TransactionalState(<br/>txn-id=0)**)<br/>{ payload } ------>
<------ disposition( <br/> first=1, last=1, <br/>state=**TransactionalState(<br/>txn-id=0,<br/>outcome=Accepted()**))

Disposing a message in a transaction

Message disposition includes operations like Complete / Abandon / DeadLetter / Defer. To perform these operations within a transaction, pass the transactional-state with the disposition.

Client (Controller) Direction Service Bus (Coordinator)
transfer(<br/>delivery-id=0, ...)<br/>{ AmqpValue (Declare())} ------>
<------ disposition( <br/> first=0, last=0, <br/>state=Declared(<br/>txn-id={transaction ID}<br/>))
<------ transfer(<br/>handle=2,<br/>delivery-id=11, <br/>state=null)<br/>{ payload }
disposition( <br/> first=11, last=11, <br/>state=**TransactionalState(<br/>txn-id=0,<br/>outcome=Accepted()**)) ------>

Advanced Service Bus capabilities

This section covers advanced capabilities of Azure Service Bus that are based on draft extensions to AMQP, currently being developed in the OASIS Technical Committee for AMQP. Service Bus implements the latest versions of these drafts and adopts changes introduced as those drafts reach standard status.

Note

Service Bus Messaging advanced operations are supported through a request/response pattern. The details of these operations are described in the article AMQP 1.0 in Service Bus: request-response-based operations.

AMQP management

The AMQP management specification is the first of the draft extensions discussed in this article. This specification defines a set of protocols layered on top of the AMQP protocol that allows management interactions with the messaging infrastructure over AMQP. The specification defines generic operations such as create, read, update, and delete for managing entities inside a messaging infrastructure and a set of query operations.

All those gestures require a request/response interaction between the client and the messaging infrastructure, and therefore the specification defines how to model that interaction pattern on top of AMQP: the client connects to the messaging infrastructure, initiates a session, and then creates a pair of links. On one link, the client acts as sender and on the other it acts as receiver, thus creating a pair of links that can act as a bi-directional channel.

Logical Operation Client Service Bus
Create Request Response Path --> attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**sender**,<br/>source=**null**,<br/>target=”myentity/$management”<br/>) No action
Create Request Response Path No action \<-- attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**receiver**,<br/>source=null,<br/>target=”myentity”<br/>)
Create Request Response Path --> attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**receiver**,<br/>source=”myentity/$management”,<br/>target=”myclient$id”<br/>)
Create Request Response Path No action \<-- attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**sender**,<br/>source=”myentity”,<br/>target=”myclient$id”<br/>)

Having that pair of links in place, the request/response implementation is straightforward: a request is a message sent to an entity inside the messaging infrastructure that understands this pattern. In that request-message, the reply-to field in the properties section is set to the target identifier for the link onto which to deliver the response. The handling entity processes the request, and then delivers the reply over the link whose target identifier matches the indicated reply-to identifier.

The pattern obviously requires that the client container and the client-generated identifier for the reply destination are unique across all clients and, for security reasons, also difficult to predict.

The message exchanges used for the management protocol and for all other protocols that use the same pattern happen at the application level; they don't define new AMQP protocol-level gestures. That's intentional, so that applications can take immediate advantage of these extensions with compliant AMQP 1.0 stacks.

Service Bus doesn't currently implement any of the core features of the management specification, but the request/response pattern defined by the management specification is foundational for the claims-based-security feature and for nearly all of the advanced capabilities discussed in the following sections:

Claims-based authorization

The AMQP Claims-Based-Authorization (CBS) specification draft builds on the management specification request/response pattern, and describes a generalized model for how to use federated security tokens with AMQP.

The default security model of AMQP discussed in the introduction is based on SASL and integrates with the AMQP connection handshake. Using SASL has the advantage that it provides an extensible model for which a set of mechanisms have been defined from which any protocol that formally leans on SASL can benefit. Among those mechanisms are “PLAIN” for transfer of usernames and passwords, “EXTERNAL” to bind to TLS-level security, “ANONYMOUS” to express the absence of explicit authentication/authorization, and a broad variety of additional mechanisms that allow passing authentication and/or authorization credentials or tokens.

AMQP’s SASL integration has two drawbacks:

  • All credentials and tokens are scoped to the connection. A messaging infrastructure might want to provide differentiated access control on a per-entity basis; for example, allowing the bearer of a token to send to queue A but not to queue B. With the authorization context anchored on the connection, it’s not possible to use a single connection and yet use different access tokens for queue A and queue B.
  • Access tokens are typically only valid for a limited time. This validity requires the user to periodically reacquire tokens and provides an opportunity to the token issuer to refuse issuing a fresh token if the user’s access permissions have changed. AMQP connections might last for long periods of time. The SASL model only provides a chance to set a token at connection time, which means that the messaging infrastructure either has to disconnect the client when the token expires or it needs to accept the risk of allowing continued communication with a client who’s access rights might have been revoked in the interim.

The AMQP CBS specification, implemented by Service Bus, enables an elegant workaround for both of those issues: It allows a client to associate access tokens with each node, and to update those tokens before they expire, without interrupting the message flow.

CBS defines a virtual management node, named $cbs, to be provided by the messaging infrastructure. The management node accepts tokens on behalf of any other nodes in the messaging infrastructure.

The protocol gesture is a request/reply exchange as defined by the management specification. That means the client establishes a pair of links with the $cbs node and then passes a request on the outbound link, and then waits for the response on the inbound link.

The request message has the following application properties:

Key Optional Value Type Value Contents
operation No string put-token
type No string The type of the token being put.
name No string The "audience" to which the token applies.
expiration Yes timestamp The expiry time of the token.

The name property identifies the entity with which the token shall be associated. In Service Bus it's the path to the queue, or topic/subscription. The type property identifies the token type:

Token Type Token Description Body Type Notes
jwt JSON Web Token (JWT) AMQP Value (string)
servicebus.windows.net:sastoken Service Bus SAS Token AMQP Value (string) -

Tokens confer rights. Service Bus knows about three fundamental rights: "Send" enables sending, "Listen" enables receiving, and "Manage" enables manipulating entities. Service Bus SAS tokens refer to rules configured on the namespace or entity, and those rules are configured with rights. Signing the token with the key associated with that rule thus makes the token express the respective rights. The token associated with an entity using put-token permits the connected client to interact with the entity per the token rights. A link where the client takes on the sender role requires the "Send" right; taking on the receiver role requires the "Listen" right.

The reply message has the following application-properties values

Key Optional Value Type Value Contents
status-code No int HTTP response code [RFC2616].
status-description Yes string Description of the status.

The client can call put-token repeatedly and for any entity in the messaging infrastructure. The tokens are scoped to the current client and anchored on the current connection, meaning the server drops any retained tokens when the connection drops.

The current Service Bus implementation only allows CBS in conjunction with the SASL method "ANONYMOUS." A SSL/TLS connection must always exist prior to the SASL handshake.

The ANONYMOUS mechanism must therefore be supported by the chosen AMQP 1.0 client. Anonymous access means that the initial connection handshake, including creating of the initial session happens without Service Bus knowing who is creating the connection.

Once the connection and session is established, attaching the links to the $cbs node and sending the put-token request are the only permitted operations. A valid token must be set successfully using a put-token request for some entity node within 20 seconds after the connection has been established, otherwise the connection is unilaterally dropped by Service Bus.

The client is subsequently responsible for keeping track of token expiration. When a token expires, Service Bus promptly drops all links on the connection to the respective entity. To prevent problem occurring, the client can replace the token for the node with a new one at any time through the virtual $cbs management node with the same put-token gesture, and without getting in the way of the payload traffic that flows on different links.

Send-via functionality

Send-via / Transfer sender is a functionality that lets service bus forward a given message to a destination entity through another entity. This feature is used to perform operations across entities in a single transaction.

With this functionality, you create a sender and establish the link to the via-entity. While establishing the link, additional information is passed to establish the true destination of the messages/transfers on this link. Once the attach has been successful, all the messages sent on this link are automatically forwarded to the destination-entity through via-entity.

Note: Authentication has to be performed for both via-entity and destination-entity before establishing this link.

Client Direction Service Bus
attach(<br/>name={link name},<br/>role=sender,<br/>source={client link ID},<br/>target=**{via-entity}**,<br/>**properties=map [(<br/>com.microsoft:transfer-destination-address=<br/>{destination-entity} )]** ) ------>
<------ attach(<br/>name={link name},<br/>role=receiver,<br/>source={client link ID},<br/>target={via-entity},<br/>properties=map [(<br/>com.microsoft:transfer-destination-address=<br/>{destination-entity} )] )

To learn more about AMQP, see Service Bus AMQP overview.