Scheduled messages with RabbitMQ using plugin & Java

There are some scenarios in the actual business that need to send messages with some delay or at a particular time, for example:

  1. Send a message after few minutes of activation
  2. Batching of the messages using a delay
  3. Sending out reports with delay

But since AMQP protocol doesn’t have a native delayed queue feature, If you search for “how to delay/schedule messages in RabbitMQ”, you’ll most likely run into two possible solutions for it.

  1. One solution is to make use of the combination of message TTL function and the dead-lettering function to emulate this.
  2. The second solution is to use the official RabbitMQ delayed messages exchange plugin.

Both solutions are valid, but the second solution is relatively simple when compared to solutions based on Dead Letter Exchanges and message TTL.

The RabbitMQ delayed messages plugins add a new exchange type to RabbitMQ which will store messages internally, using Mnesia, until they are scheduled for delivery. This protects in case the server goes down.

So let’s begin with implementing the second solution by installing the plugin first but before that have a look at its prerequisite:

  1. RabbitMQ 3.5.8 and later versions.
  2. Erlang/OTP 18.0 and later versions

Installing the Plugin

To install the plugin go to the Community Plugins page, search for “rabbitmq_delayed_message_exchange” and download the corresponding .ez files(which are zip files with metadata) for your RabbitMQ installation. Copy the plugin into RabbitMQ’s plugin folder.

The plugins directory location is determined by the RABBITMQ_PLUGINS_DIR environment variable. Its default location depends on how RabbitMQ was installed. Some common values are:

Once the files were copied to the correct directory, then enable it by running the following command:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Declaring Exchange

To delay messages, declare an exchange with the type x-delayed-message

//Author: Preetham Umarani
channel.assertExchange(exchange, "x-delayed-message", {
autoDelete: false,
durable: true,
passive: true,
arguments: {
'x-delayed-type': "direct"
}
})

When we declared the exchange above, we provided an x-delayed-type argument set to “direct” so our exchange will behave like the direct exchange, but we could pass the topic, fanout, or a custom exchange type provided by some other plugin.

Publishing Messages

To delay a message, publish messages with the custom header x-delay expressing in milliseconds a delay time for the message. The message will be delivered to the respective queues after x-delay milliseconds, till then it will be stored in the Mnesia table.

If the x-delay header is not present, then the plugin will proceed to route the message without delay.

//Author: Preetham Umarani
channel.publish(exchange, queue, new Buffer.from(params), {
headers: {
"x-delay": delayInMilliSeconds
}
})

Checking if a Message was Delayed

To check if the message was delayed or not, you can check the x-delay header of the message received, it will be equal to -(minus) delay.

For example, if you have published a message with a 10000 milliseconds delay, the consumer receiving said message will find the x-delay header set to -10000

Try this out: It’s fairly simple and straightforward. Thank me later. Till next article. BBYE!

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store