Updated publisher-confirms.txt

AlexanderSemenets authored
revision bc7e4d564c8a44eb5a4e60f158e104d202be78d7
publisher-confirms
# Publisher Confirms

Consumer Acknowledgements and Publisher Confirms
Introduction
Systems that use a messaging broker such as RabbitMQ are by definition distributed. Since protocol methods (messages) sent are not guaranteed to reach the peer or be successfully processed by it, both publishers and consumers need a mechanism for delivery and processing confirmation. Several messaging protocols supported by RabbitMQ provide such features. This guide covers the features in AMQP 0-9-1 but the idea is largely the same in other protocols (STOMP, MQTT, et cetera).

Delivery processing acknowledgements from consumers to RabbitMQ are known as acknowledgements in AMQP 0-9-1 parlance; broker acknowledgements to publishers are a protocol extension called publisher confirms.

(Consumer) Delivery Acknowledgements
When RabbitMQ delivers a message to a consumer, it needs to know when to consider the message successfully sent. What kind of logic is optimal depends on the system. It is therefore primarily an application decision. In AMQP 0-9-1 it is made when a consumer is registered using the basic.consume method or a message is fetched on demand with the basic.get method.

If you prefer a more example-oriented and step-by-step material, consumer acknowledgements are also covered in RabbitMQ tutorial #2.

Delivery Identifiers: Delivery Tags

Before we proceed to discuss other topics it is important to explain how deliveries are identified (and acknowledgements indicate their respective deliveries). When a consumer (subscription) is registered, messages will be delivered (pushed) by RabbitMQ using the basic.deliver method. The method carries a delivery tag, which uniquely identifies the delivery on a channel. Delivery tags are therefore scoped per channel.

Delivery tags are monotonically growing positive integers and are presented as such by client libraries. Client library methods that acknowledge deliveries take a delivery tag as an argument.

Acknowledgement Modes

Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit ("manual") client acknowledgement is received. Manually sent acknowledgements can be positive or negative and use one of the following protocol methods:

basic.ack is used for positive acknowledgements
basic.nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1)
basic.reject is used for negative acknowledgements but has one limitations compared to basic.nack
Positive acknowledgements simply instruct RabbitMQ to record a message as delivered. Negative acknowledgements with basic.reject have the same effect. The difference is primarily in the semantics: positive acknowledgements assume a message was successfully processed while their negative counterpart suggests that a delivery wasn't processed but still should be deleted.

Acknowledging Multiple Deliveries at Once

Manual acknowledgements can be batched to reduce network traffic. This is done by setting the multiple field of acknowledgement methods (see above) to true. Note that basic.reject doesn't historically have the field and that's why basic.nack was introduced by RabbitMQ as a protocol extension.

When the multiple field is set to , RabbitMQ will acknowledge all outstanding delivery tags up to and including the tag specified in the acknowledgement. Like everything else related to acknowledgements, this is scoped per channel. For example, given that there are delivery tags 5, 6, 7, and 8 unacknowledged on channel Ch, when an acknowledgement frame arrives on that channel with delivery_tag set to 8 and multiple set to true, all tags from 5 to 8 will be acknowledged. If multiple was set to false, deliveries 5, 6, and 7 would still be unacknowledged.

Channel Prefetch Setting (QoS)

Because messages are sent (pushed) to clients asynchronously, there is usually more than one message "in flight" on a channel at any given moment. In addition, manual acknowledgements from clients are also inherently asynchronous in nature. So there's a sliding window of delivery tags that are unacknowledged. Developers would often prefer to cap the size of this window to avoid the unbounded buffer problem on the consumer end. This is done by setting a "prefetch count" value using the basic.qos method. The value defines the max number of unacknowledged deliveries that are permitted on a channel. Once the number reaches the configured count, RabbitMQ will stop delivering more messages on the channel unless at least one of the outstanding ones is acknowledged.

For example, given that there are delivery tags 5, 6, 7, and 8 unacknowledged on channel Ch and channel Ch's prefetch count is set to 4, RabbitMQ will not push any more deliveries on Ch unless at least one of the outstanding deliveries is acknowledged. When an acknowledgement frame arrives on that channel with delivery_tag set to 8, RabbitMQ will notice and deliver one more message.

It's worth reiterating that the flow of deliveries and manual client acknowledgements is entirely asynchronous. Therefore if prefetch value is changed while there already are deliveries in flight, a natural race condition arises and there can temporarily be more than prefetch count unacknowledged messages on a channel.

Client Errors: Double Acking and Unknown Tags

Should a client acknowledge the same delivery tag more than once, RabbitMQ will result a channel error such as PRECONDITION_FAILED - unknown delivery tag 100. The same channel exception will be thrown if an unknown delivery tag is used.

Publisher Confirms
Using standard AMQP 0-9-1, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional, publish the message, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced. It mimics the consumer acknowledgements mechanism already present in the protocol.

To enable confirms, a client sends the confirm.select method. Depending on whether no-wait was set or not, the broker may respond with a confirm.select-ok. Once the confirm.select method is used on a channel, it is said to be in confirm mode. A transactional channel cannot be put into confirm mode and once a channel is in confirm mode, it cannot be made transactional.

Once a channel is in confirm mode, both the broker and the client count messages (counting starts at 1 on the first confirm.select). The broker then confirms messages as it handles them by sending a basic.ack on the same channel. The delivery-tag field contains the sequence number of the confirmed message. The broker may also set the multiple field in basic.ack to indicate that all messages up to and including the one with the sequence number have been handled.

An example in Java that publishes a large number of messages to a channel in confirm mode and waits for the acknowledgements can be found here.

Negative Acknowledgment

In exceptional cases when the broker is unable to handle messages successfully, instead of a basic.ack, the broker will send a basic.nack. In this context, fields of the basic.nack have the same meaning as the corresponding ones in basic.ack and the requeue field should be ignored. By nack'ing one or more messages, the broker indicates that it was unable to process the messages and refuses responsibility for them; at that point, the client may choose to re-publish the messages.

After a channel is put into confirm mode, all subsequently published messages will be confirmed or nack'd once. No guarantees are made as to how soon a message is confirmed. No message will be both confirmed and nack'd.

basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue.

When will messages be confirmed?

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).

For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.

Ack Latency for Persistent Messages

basic.ack for a persistent message routed to a durable queue will be sent after persisting the message to disk. The RabbitMQ message store persists messages to disk in batches after an interval (a few hundred milliseconds) to minimise the number of fsync(2) calls, or when a queue is idle. This means that under a constant load, latency for basic.ack can reach a few hundred milliseconds. To improve throughput, applications are strongly advised to process acknowledgements asynchronously (as a stream) or publish batches of messages and wait for outstanding confirms. The exact API for this varies between client libraries.

Ordering Considerations for Publisher Confirms

In most cases, RabbitMQ will acknowledge messages to publishers in the same order they were published (this applies for messages published on a single channel). However, publisher acknowledgements are emitted asynchronously and can confirm a single message or a group of messages. The exact moment when a confirm is emitted depends on the delivery mode of a message (persistent vs. transient) and the properties of the queue(s) the message was routed to (see above). Which is to say that different messages can be considered ready for acknowledgement at different times. This means that acknowledgements can arrive in a different order compared to their respective messages. Applications should not depend on the order of acknowledgements when possible.

Publisher Confirms and Guaranteed Delivery

The broker loses persistent messages if it crashes before said messages are written to disk. Under certain conditions, this causes the broker to behave in surprising ways.

For instance, consider this scenario:

a client publishes a persistent message to a durable queue
a client consumes the message from the queue (noting that the message is persistent and the queue durable), but doesn't yet ack it,
the broker dies and is restarted, and
the client reconnects and starts consuming messages.
At this point, the client could reasonably assume that the message will be delivered again. This is not the case: the restart has caused the broker to lose the message. In order to guarantee persistence, a client should use confirms. If the publisher's channel had been in confirm mode, the publisher would not have received an ack for the lost message (since the message hadn't been written to disk yet).
Limitations
Maximum Delivery Tag

Delivery tag is a 64 bit long value, and thus its maximum value is 9223372036854775807. Since delivery tags are scoped per channel, it is very unlikely that a publisher or consumer will run over this value in practice.


In many messaging scenarios, you must not lose messages. Since AMQP gives few guarantees regarding message persistence/handling, the traditional way to do this is with transactions, which can be unacceptably slow. To remedy this problem, we introduce an extension to AMQP in the form of Lightweight Publisher Confirms.

Guaranteed Delivery with Tx

In RabbitMQ, a persistent message is one that should survive a broker restart. The operative word here is should, since the message can still be lost if broker goes down before it's had a chance to write the message to disk. In some cases, this is not enough and the publisher needs to know whether a message was handled correctly or not. The straightforward solution is to use transactions, i.e. to commit every message.

The publisher would use something like:

ch.txSelect();
for (int i = 0; i < MSG_COUNT; ++i) {
ch.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
ch.txCommit();
}
And the cosumer would do something like:

QueueingConsumer qc = new QueueingConsumer(ch);
ch.basicConsume(QUEUE_NAME, true, qc);
for (int i = 0; i < MSG_COUNT; ++i) {
qc.nextDelivery();
System.out.printf("Consumed %d¶
", i);
}
The complete program including some timing code is available here. It takes a bit more than 4 minutes to publish 10000 messages.

Streaming Lightweight Publisher Confirms

There are two problems with using transactions in this case. The first is that they are blocking: the publisher has to wait for the broker to process each message. Knowing that all the messages with the possible exception of the last one were successfully processed is, usually, too strong a guarantee; it would be enough if the publisher knew which messages had not yet been processed when the broker died. The second problem is that transactions are needlessly heavy: every commit requires a fsync(), which takes a lot of time to complete.

Enter Confirms: once a channel is put into confirm mode, the broker will confirm messages as it processes them. Since this is done asynchronously, the producer can stream publishes and not wait for the broker and the broker can batch disk writes effectively.

Here is the above example, but using confirms:

private volatile SortedSet unconfirmedSet =
Collections.synchronizedSortedSet(new TreeSet());

...

ch.setConfirmListener(new ConfirmListener() {
public void handleAck(long seqNo, boolean multiple) {
if (multiple) {
unconfirmedSet.headSet(seqNo+1).clear();
} else {
unconfirmedSet.remove(seqNo);
}
}
public void handleNack(long seqNo, boolean multiple) {
// handle the lost messages somehow
}
});
ch.confirmSelect();
for (long i = 0; i < MSG_COUNT; ++i) {
unconfirmedSet.add(ch.getNextPublishSeqNo());
ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
}
while (unconfirmedSet.size() > 0)
Thread.sleep(10);
The full code is available here. Before going on, it is worth mentioning that running this takes around 2 seconds. It is more than 100 times faster than the transactional code.

What does the code do? It starts by declaring a set which will hold the ids of the so-far unconfirmed messages. Then, it sets the channel into confirm mode and attaches an AckListener to the channel. As it publishes messages, it adds them to the set; at the same time, the AckListener removes messages from the set as it receives confirms. Finally, the producer waits for all the messages to be confirmed. The set always holds the messages which need to be retransmitted in case of a failure.

How Confirms Work

Confirms extend standard AMQP by adding the confirm class. This class contains only two methods, confirm.select and confirm.select-ok. In addition, the basic.ack method can be sent to clients.

The confirm.select method enables publisher confirms on a channel. Note that a transactional channel cannot be put into confirm mode and a confirm mode channel cannot be made transactional.

When the confirm.select method is sent/received, the publisher/broker begins numbering publishes (the first publish after the confirm.select is 1). Once a channel is in confirm mode, the publisher should expect to receive basic.ack methods. The delivery-tag field indicates the number of the confirmed message.

When the broker acknowledges a message, it assumes responsibility for it and informs the publisher that it has been handled successfully; what "handled successfully" means is context-dependent.

The basic rules are as follows:

an un-routable mandatory or immediate message is confirmed right after the basic.return;
otherwise, a transient message is confirmed the moment it is enqueued; and,
a persistent message is confirmed when it is persisted to disk or when it is consumed on every queue.


Note that for a persistent message to be confirmed, it must be written to disk or ack'd on all the queues it was delivered to. With regard to confirms, persistent messages delivered to non-durable queues behave like transient messages. Queue deletion, queue purge and basic.reject{requeue=false} simulate a consumer acknowledgement. With respect to per-queue ttl, message expiry simulates a consumer acknowledgement.

If more than one of these conditions are met, only the first causes a confirm to be sent. Every published message will be confirmed sooner or later and no message will be confirmed more than once. Since the basic.return is sent before the basic.ack, once a publisher receives a basic.ack, it knows that it will never hear of that message again.

The broker may always set the multiple bit in the basic.acks. A basic.ack with multiple set means that all messages up-to-and-including delivery-tag are acknowledged.

There are some gotchas regarding confirms. Firstly, the broker makes no guarantees as to when a message will be confirmed, only that it will be confirmed. Secondly, message processing slows down as un-confirmed messages pile up: the broker does several O(log(number-of-unconfirmed-messages)) operations for each confirm-mode publish. Thirdly, if the connection between the publisher and broker drops with outstanding confirms, it does not necessarily mean that the messages were lost, so republishing may result in duplicate messages. Lastly, if something bad should happen inside the broker and cause it to lose messages, it will basic.nack those messages (hence, the handleNack() in ConfirmHandler).

In summary, Confirms give clients a lightweight way of keeping track of which messages have been processed by the broker and which would need re-publishing in case of broker shutdown or network failure.

This entry was posted on Thursday, February 10th, 2011 at 12:16 pm by Alexandru Scvorţov and is filed under New Features. You can follow any responses to this entry through the RSS 2.0 feed. Both comments and pings are currently closed.

6 Responses to “Introducing Publisher Confirms”

Twitter Trackbacks for RabbitMQ » Blog Archive » Introducing Publisher Confirms - Messaging that just works [rabbitmq.com] on Topsy.com Says:
February 10th, 2011 at 12:46 pm
[...] RabbitMQ » Blog Archive » Introducing Publisher Confirms - Messaging that just works rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/ – view page – cached RabbitMQ is a complete and highly reliable enterprise messaging system based on the emerging AMQP standard Show influential only (1) $('#filter-infonly').change(function() { var el = $(this); var url = document.location.href; var checked = el.attr('checked'); if (checked) { document.location.href = url + ((/?/.test(url)) ? '&' : '?') + 'infonly=1'; } else { document.location.href = url.replace(/[?&]?infonly=1/,''); } }); [...]

Peet Denny Says:
March 28th, 2011 at 5:21 pm
Wow, these are seriously fast; we're able to publish 1 million persistent 2k messages to a remote queue in less than 3 mins. Nice one :)
(We're using 2.3.1, I hear that 2.4 is even faster)

Now we have the interesting problem that our producers are actually faster than our consumers (1 million in 6 mins)
Would you recommend getting our producer to back off, to slow down when unacknowledged messages go above a certain number, or is there a way that we can get our consumers to go faster?

Cheers

alexandru Says:
March 28th, 2011 at 9:37 pm
You probably should throttle the publishers somehow. RabbitMQ will do this automatically when it starts to run out of memory, but you probably want to avoid getting there. In addition, if the publishers are publishing faster than RabbitMQ can write the messages to disk, unconfirmed messages will pile up and slow down the channels and queues.

The MulticastMain Java example contains code to throttle publishers when they exceed a certain number of unconfirmed messages (see the Producer class; it uses a Semaphore confirmPool to block the producer):
MulticastMain.java

There isn't really any trick to getting consumers to consume faster. They just need to process messages more quickly. In general, make sure you're using basic.consume and not not basic.get. Try to acknowledge or reject/nack messages as soon as possible. You could also try to set basic.qos to 1 message prefetch.

Routing Topologies for Performance and Scalability with RabbitMQ | SpringSource Team Blog Says:
April 2nd, 2011 at 1:32 am
[...] on 10,000 messages can take as along as four minutes to publish. A new RabbitMQ feature called Publisher Confirms is more than 100 times faster than the same, but transactional, code. If you are not explicitly [...]

How do I get IModel.BasicAcks to fire? - Programmers Goodies Says:
October 2nd, 2011 at 11:10 pm
[...] Briefly put, the event fires, but it’s not what I thought it might be– it’s for Publisher Confirms, explained in this RabbitMQ blog post [...]