Basics of RabbitMQ using AMQP
Base code: https://github.com/rabbitmq/rabbitmq-tutorials
My code:
Enterprise Integration Patterns
Messaging:
http://www.enterpriseintegrationpatterns.com/patterns/messaging/index.html
http://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html
Basic commands
From a Windows installation in command line (previously add sbin/path to PATH):
Start server: >> rabbitmq-server.bat
Listing queues: >> rabbitmqctl.bat list_queues
Admin Console: http://localhost:15672/
Default Port: 5672
Check messages unacknowledged (to monitor if there exist messages without a known tag):rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Listing Exchanges: rabbitmqctl list_exchanges
Listing Bindings: rabbitmqctl list_bindings
Bootstrap a RabbitMQ App
Spring Boot applications have the option of providing their properties through either an application.properties or application.yml file.
- Spring Initializer: http://start.spring.io/
- Spring Profiles
Simple RabbitMQ App
Implementing:
- NOTE: Using Spring @Profile to tag methods and classes in Spring. Have a RabbitMM basic installation.
- POM dependencies for Spring Boot RabbitMQ AMQP protocol
- Spring Boot Application class.
- Spring Boot Configuration class.
- Sender Class.
- Receiver Class.
POM File (particularly spring-boot-starter-amqp
dependency):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.architecture.messaging</groupId>
<artifactId>rabbitmq-amqp</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>rabbitmq-demo-2</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.M7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.M5</spring-cloud.version>
</properties>
<dependencies><dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
The Spring Boot Application (specially SpringApplication.run
method):
package org.architecture.messaging.amqp.simple;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* Profile: {simple|balance-requesting},{request-balance|create-balance}
*
* java -jar rabbitmq-amqp-1.0.jar
* --spring.profiles.active=balance-requesting,request-balance java -jar
* rabbitmq-amqp-1.0.jar
* --spring.profiles.active=balance-requesting,create-balance java -jar
* rabbitmq-amqp-1.0.jar --spring.profiles.active=balance-notifications,notify
* java -jar rabbitmq-amqp-1.0.jar
* --spring.profiles.active=balance-notifications,send-notification
*
* @author schkola
*
*/
@SpringBootApplication
@ComponentScan(basePackages = { "org.architecture.messaging.amqp" })
@EnableScheduling
public class SimpleAmqpApplication {
@Profile("usage")
@Bean
public CommandLineRunner usage() {
return new CommandLineRunner() {
@Override
public void run(String... args) throws Exception {
System.out.println("This app uses Spring Profiles to control its behavior.\n");
System.out.println(
"Sample usage: java -jar rabbitmq-amqp-1.0.jar --spring.profiles.active=balance-requesting,request-balance");
}
};
}
@Profile("!usage")
@Bean
public CommandLineRunner execute() {
return new SimpleSchedulerRunner();
}
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleAmqpApplication.class, args);
}
}
The Configuration class (specially @Configuration
).
package org.architecture.messaging.amqp.simple;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({ "simple", "balance-requesting" })
@Configuration
public class SimpleConfiguration {
@Bean
public Queue balance() {
return new Queue("balance");
}
@Profile("create-balance")
@Bean
public Balance response() {
return new Balance();
}
@Profile("request-balance")
@Bean
public RequestBalance request() {
System.out.println(">>> Creating RequestBalance Bean");
return new RequestBalance();
}
}
The Sender class, using RabbitTemplate
for AMQP protocol.
package org.architecture.messaging.amqp.simple;
import java.util.Random;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class RequestBalance {
@Autowired
private RabbitTemplate template;
@Autowired
private Queue queue;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void simpleRequest() {
Random r = new Random();
int account = r.nextInt((99999 - 1) + 1) + 1;
String message = "Request balance for account " + account;
this.template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
The Receiver class, with @RabbitListener
and @RabbitHandler
package org.architecture.messaging.amqp.simple;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@RabbitListener(queues = "balance")
public class Balance {
@RabbitHandler
public void createBalance(String request) {
System.out.println(" [x] Received '" + request + "'");
}
}
Worker RabbitMQ App
Distributing time-consuming tasks among workers. We encapsulate a _task _as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
http://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html
Implementing:
- Same Steps 1 to 3 as before.
- Spring Boot Configuration class.
- Sender Class.
- Receiver Class.
Spring Boot configuration file:
package org.architecture.messaging.amqp.worker;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({ "worker", "notifications" })
@Configuration
public class WorkerConfiguration {
@Bean
public Queue notifications() {
return new Queue("notifications");
}
@Profile("send-notification")
private static class NotificationConfig {
@Bean
public NotificationSender sendBankNotification() {
System.out.println(">>> Creating NotificationSender (BANK) Bean");
return new NotificationSender("BANK");
}
@Bean
public NotificationSender sendCustomerNotification() {
System.out.println(">>> Creating NotificationSender (CUSTOMER) Bean");
return new NotificationSender("CUSTOMER");
}
}
@Profile("notify")
@Bean
public Notify requestNotification() {
System.out.println(">>> Creating Notify Bean");
return new Notify();
}
}
The sender file is the same as before, using @RabbitTemplate to send messages to RabbitMQ using AMQP.
package org.architecture.messaging.amqp.worker;
import java.util.Random;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class Notify {
@Autowired
private RabbitTemplate template;
@Autowired
private Queue queue;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
System.out.println(">>> Into Notify: generating message");
Random r = new Random();
int notificationLenght = r.nextInt((10 - 1) + 1) + 1;
this.template.convertAndSend(queue.getName(), notificationLenght+"");
System.out.println(" [x] Sent '" + notificationLenght + "'");
}
}
Receiver class using@RabbitListener
and @RabbitHandler
. Besides using Spring StopWatch
class to control asynchronous processing for every worker.
package org.architecture.messaging.amqp.worker;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;
@RabbitListener(queues = "notifications")
public class NotificationSender {
private String who;
public NotificationSender(String who) {
this.who = who;
}
@RabbitHandler
public void sendEmail(String sMessageLenght) throws InterruptedException {
int messageLenght = Integer.parseInt(sMessageLenght);
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + this.who + " [x] Received message for notification '" + messageLenght + "'");
doWork(messageLenght);
watch.stop();
System.out.println("instance " + this.who + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
private void doWork(int messageLenght) throws InterruptedException {
Thread.sleep(1000 * messageLenght);
}
}
Error Handling
Spring-amqp by default takes a conservative approach to message acknowledgement. If the listener throws an exception the container calls: channel.basicReject(deliveryTag, requeue).
The listener could throw an AmqpRejectAndDontRequeueException
. After processing the message the listener calls: channel.basicAck().
Setting property defaultRequeueRejected=false
could change the behaviour of requeue.
Acknowledged means the consumer must notify RabbitMQ the message has been totally processed.
Message durability
Some common properties for Spring AMQP are in https://docs.spring.io/spring-amqp/reference/htmlsingle/#_common_properties.
Property | default | Description |
---|---|---|
durable | true | When declareExchange is true the durable flag is set to this value |
deliveryMode | PERSISTENT | PERSISTENT or NON_PERSISTENT to determine whether or not RabbitMQ should persist the messages |
There is not guarantee to mark a message as persistent for having it stored in a data store. There are more options to do it.
https://www.rabbitmq.com/confirms.html
Message Dispatching
By default, RabbitMQ uses Round robin dispatching algorithm (n-th message is assigned to the n-th consumer). But Spring AMQP uses the Fair dispatching algorithm using SimpleMessageListenerContainer
that defines the value for DEFAULT_PREFETCH_COUNT to be 1. That means RabbitMQ only sends new arrival message to consumer if this does not have a pending message or when the previous message has been acknowledged. DEFAULT_PREFETCH_COUNT set to 0 means Round Robin behaviour.
See more dispatching stragegies in literature.
Publish/Suscribe RabbitMQ App
Implement the fanout pattern to deliver a message to multiple consumers. Essentially, published messages are going to be broadcast to all the receivers.
Fanout Pattern
Fan-in Fan-out
is a way of Multiplexing and Demultiplexing in golang. Fan-in refers to processing multiple input data and combining into a single entity. Fan-out is the exact opposite, dividing the data into multiple smaller chunks, distributing the work amongst a group of workers to parallelize CPU use and I/O.
https://en.wikipedia.org/wiki/Fan-out_(software\
https://medium.com/@thejasbabu/concurrency-patterns-golang-5c5e1bcd0833
https://community.mapr.com/thread/19377-fan-out-design-pattern
https://community.mapr.com/external-link.jspa?url=https%3A%2F%2Fheroku.github.io%2Fkafka-demo%2F
Exchanges
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
Exchanges types:
- direct
- topic
- fanout
- headers
Default fanouts declared for RabbitMQ:
C:\ISBAN\workspaces\messaging-architecture\rabbitmq-demo-2>rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
springCloudBusOutput topic
amq.topic topic
springCloudBus topic
direct
amq.match headers
amq.rabbitmq.trace topic
amq.fanout fanout
amq.rabbitmq.log topic
amq.headers headers
amq.direct direct
Nameless exchange: it is defined by "".
Temporary queues
Giving a queue a name is important when you want to share the queue between producers and consumers. AnonymousQueue, which creates a non-durable, exclusive, autodelete queue with a generated name.
Firstly, whenever we connect to Rabbit we need a fresh, empty queue. Secondly, once we disconnect the consumer the queue should be automatically deleted.
The relationship between exchange and a queue is called a binding.
We need to supply a routingKey when sending, but its value is ignored for fanout exchanges.
The messages will be lost if no queue is bound to the exchange yet.
Implementing
- Same Steps 1 to 3 as before.
- Spring Boot Configuration class. Using
AnonymousQueue
,FanoutExchange
andBinding
. - Sender Class.
- Receiver Class.
Spring Boot configuration file:
package org.architecture.messaging.amqp.fanout;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({ "fanout", "pub-sub", "publish-suscribe" })
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanout() {
return new FanoutExchange("new-accounts-fanout");
}
@Profile("account-registration")
private static class BankAccountBuilder {
@Bean
public Queue customerAccountQueue() {
return new AnonymousQueue();
}
@Bean
public Queue bankAccountQueue() {
return new AnonymousQueue();
}
@Bean
public Binding bindCustomer(FanoutExchange fanout, Queue customerAccountQueue) {
return BindingBuilder.bind(customerAccountQueue).to(fanout);
}
@Bean
public Binding bindBank(FanoutExchange fanout, Queue bankAccountQueue) {
return BindingBuilder.bind(bankAccountQueue).to(fanout);
}
@Bean
public AccountRegistration registration() {
return new AccountRegistration();
}
}
@Profile("new-account")
@Bean
public AccountRequest registration() {
return new AccountRequest();
}
}
Using Publisher. Using FanoutExchange
package org.architecture.messaging.amqp.fanout;
import java.util.Random;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class AccountRequest {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void request() {
Random r = new Random();
int account = r.nextInt((99999 - 1) + 1) + 1;
String message = "Request new account " + account;
this.template.convertAndSend(fanout.getName(), "", message);
System.out.println(" [x] Sent '" + message + "'");
}
}
Using Subscriber. Omit using @RabbitHandler
.
package org.architecture.messaging.amqp.fanout;
import java.util.Random;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;
public class AccountRegistration {
@RabbitListener(queues = "#{customerAccountQueue.name}")
public void createCustomerAccount(String userInfo) throws InterruptedException {
createAccount(userInfo, "CUSTOMER_ACCOUNT");
}
@RabbitListener(queues = "#{bankAccountQueue.name}")
public void createBankAccount(String userInfo) throws InterruptedException {
createAccount(userInfo, "BANK_ACCOUNT");
}
public void createAccount(String userInfo, String receiver) throws InterruptedException {
System.out.println("instance " + receiver + " [x] Received message for account creation '" + userInfo + "'");
Random r = new Random();
int messageLenght = r.nextInt((10 - 1) + 1) + 1;
StopWatch watch = new StopWatch();
watch.start();
doWork(messageLenght);
watch.stop();
System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
private void doWork(int messageLenght) throws InterruptedException {
Thread.sleep(1000 * messageLenght);
}
}
Monitoring:
C:\ISBAN\workspaces\messaging-architecture\rabbitmq-demo-1>rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange balance queue balance []
exchange hello queue hello []
exchange notifications queue notifications []
exchange spring.gen-PRXBkLIfTkSn_fKVSsIodw queue spring.gen-PRXBkLIfTkSn_fKVSsIodw []
exchange spring.gen-j7YewEFZTHudPP9ULNGeWA queue spring.gen-j7YewEFZTHudPP9ULNGeWA []
new-accounts-fanout exchange spring.gen-PRXBkLIfTkSn_fKVSsIodw queue []
new-accounts-fanout exchange spring.gen-j7YewEFZTHudPP9ULNGeWA queue []
Routing RabbitMQ App
Make it possible to subscribe only to a subset of the messages. The meaning of a binding key depends on the exchange type. For example, we may want a program which writes log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.
Direct Exchange
The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message. It could define more than one binding for a single queue using different binding key. Messages arriving to the exchange which not match any key are discarded.
It can't do routing based on multiple criteria.
The Spring configuration class:
package org.architecture.messaging.amqp.routing;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({ "direct-exchange", "routing" })
@Configuration
public class RoutingConfiguration {
@Bean
public DirectExchange direct() {
return new DirectExchange("logging-exchange");
}
@Profile("logger")
private static class MyLoggerConfig {
@Bean
public Queue errorLogQueue() {
return new AnonymousQueue();
}
@Bean
public Queue appLogQueue() {
return new AnonymousQueue();
}
@Bean
public Binding bindErrorLog(DirectExchange direct, Queue errorLogQueue) {
return BindingBuilder.bind(errorLogQueue).to(direct).with("error");
}
@Bean
public Binding bingInfoLog(DirectExchange direct, Queue appLogQueue) {
return BindingBuilder.bind(appLogQueue).to(direct).with("info");
}
@Bean
public Binding bingDebugLog(DirectExchange direct, Queue appLogQueue) {
return BindingBuilder.bind(appLogQueue).to(direct).with("debug");
}
@Bean
public MyLogger logger() {
return new MyLogger();
}
}
@Profile("log")
@Bean
public MyLog logging() {
return new MyLog();
}
}
The producer class:
package org.architecture.messaging.amqp.routing;
import java.util.Random;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class MyLog {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
private String[] keys = { "error", "debug", "info" };
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void log() {
Random r = new Random();
int keyIndex = r.nextInt((3 - 1) + 1) + 1;
String message = "Sending a message to a key " + keys[keyIndex - 1];
this.template.convertAndSend(direct.getName(), keys[keyIndex - 1], message);
System.out.println(" [x] Sent '" + message + "'");
}
}
The consumer class:
package org.architecture.messaging.amqp.routing;
import java.util.Random;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;
public class MyLogger {
@RabbitListener(queues = "#{errorLogQueue.name}")
public void logError(String logInfo) throws InterruptedException {
logger(logInfo, "ERROR");
}
@RabbitListener(queues = "#{appLogQueue.name}")
public void logAppInfo(String logInfo) throws InterruptedException {
logger(logInfo, "INFO");
}
public void logger(String logInfo, String receiver) throws InterruptedException {
System.out.println("instance " + receiver + " [x] Received message for logging '" + logInfo + "'");
Random r = new Random();
int messageLenght = r.nextInt((10 - 1) + 1) + 1;
StopWatch watch = new StopWatch();
watch.start();
doWork(messageLenght);
watch.stop();
System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
private void doWork(int messageLenght) throws InterruptedException {
Thread.sleep(1000 * messageLenght);
}
}
Topic RabbitMQ App
In our messaging system we might want to subscribe to not only queues based on the routing key, but also based on the source which produced the message.
Topic Exchange
A message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. However there are two important special cases for binding keys:
- *(star) can substitute for exactly one word.
- #(hash) can substitute for zero or more words.
There can be as many words in the routing key as you like, up to the limit of 255 bytes.
Implementing
The Spring Configuration file.
package org.architecture.messaging.amqp.topic;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
/**
* Binding keys: - *.own.* { <type>.own.<status> } Examples: - credit.own.active
* - debit.own.canceled - other.# { other.<bank>.<status> } Examples: -
* other.bbva.active - other.santander.inactive
*
* @author Z462725
*
*/
@Profile({ "topic-exchange", "topics" })
@Configuration
public class TopicConfiguration {
@Bean
public TopicExchange topic() {
return new TopicExchange("topic-exchange");
}
@Profile("accounts")
private static class MyAccountsConfig {
@Bean
public Queue ownAccountsQueue() {
return new AnonymousQueue();
}
@Bean
public Queue thirdPartyAccountsQueue() {
return new AnonymousQueue();
}
@Bean
public Binding bindOwnCreditAccounts(TopicExchange topic, Queue ownAccountsQueue) {
return BindingBuilder.bind(ownAccountsQueue).to(topic).with("credit.own.*");
}
@Bean
public Binding bingOwnDebitAccounts(TopicExchange topic, Queue ownAccountsQueue) {
return BindingBuilder.bind(ownAccountsQueue).to(topic).with("debit.own.*");
}
@Bean
public Binding bingThirdPartyAccounts(TopicExchange topic, Queue thirdPartyAccountsQueue) {
return BindingBuilder.bind(thirdPartyAccountsQueue).to(topic).with("other.#");
}
@Bean
public Account accountInfo() {
return new Account();
}
}
@Profile("customer")
@Bean
public Customer query() {
return new Customer();
}
}
The Producer class.
package org.architecture.messaging.amqp.topic;
import java.util.Random;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class Customer {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
private final String[] keys = { "credit.own.active", "debit.own.canceled", "other.bbva.active",
"other.santander.inactive", "ignored.santander.inactive", "credit.ignored.more" };
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void request() {
Random r = new Random();
int keyIndex = r.nextInt((6 - 1) + 1) + 1;
String message = "Sending a message to a key " + keys[keyIndex - 1];
this.template.convertAndSend(topic.getName(), keys[keyIndex - 1], message);
System.out.println(" [x] Sent '" + message + "'");
}
}
The Consumer class.
package org.architecture.messaging.amqp.topic;
import java.util.Random;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;
public class Account {
@RabbitListener(queues = "#{ownAccountsQueue.name}")
public void consultOwnAccounts(String customerInfo) throws InterruptedException {
accounts(customerInfo, "OWN");
}
@RabbitListener(queues = "#{thirdPartyAccountsQueue.name}")
public void consultThirdPartyAccounts(String customerInfo) throws InterruptedException {
accounts(customerInfo, "THIRD_PARTY");
}
public void accounts(String customerInfo, String receiver) throws InterruptedException {
System.out.println("instance " + receiver + " [x] Received message for customerInfo '" + customerInfo + "'");
Random r = new Random();
int messageLenght = r.nextInt((10 - 1) + 1) + 1;
StopWatch watch = new StopWatch();
watch.start();
doWork(messageLenght);
watch.stop();
System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
private void doWork(int messageLenght) throws InterruptedException {
Thread.sleep(1000 * messageLenght);
}
}
RPC RabbitMQ App
For no long-executing time tasks there is an alternative called RPC (Remote Procedure Call) or Client-Server call. It uses the pattern:
http://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
Disadvantage: The problems arise when a programmer is not aware whether a function call is local or if it's a slow RPC.
In mind:
- Make sure it's obvious which function call is local and which is remote.
- Document your system. Make the dependencies between components clear.
- Handle error cases. How should the client react when the RPC server is down for a long time?
Alternative: If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.
Callback Queue
https://docs.spring.io/spring-amqp/reference/htmlsingle/#request-reply
Typically the native client would create a callback queue for every RPC request. That raises a new issue, having received a response in that queue it's not clear to which request the response belongs. That's when the correlationId
property is used. Spring-amqp automatically sets a unique value for every request. In addition it handles the details of matching the response with the correct correlationID.
aa
Message properties
The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
deliveryMode
: Marks a message as persistent (with a value of 2) or transient (any other value).contentType
: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.replyTo
: Commonly used to name a callback queue.correlationId
: Useful to correlate RPC responses with requests.
Handling Errors / Timeouts
When the response is not delivered on time, some exceptions occurs:
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
Caused by: java.lang.InterruptedException: null
Implementing
The Spring Configuration class.
package org.architecture.messaging.amqp.rpc;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({ "rpc", "client-server" })
@Configuration
public class RpcConfiguration {
@Profile("client")
private static class ClientConfig {
@Bean
public DirectExchange exchange() {
return new DirectExchange("rpc-exchange");
}
@Bean
public Client client() {
return new Client();
}
}
@Profile("server")
private static class ServerConfig {
@Bean
public Queue queue() {
return new Queue("rpc.requests");
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("rpc-exchange");
}
@Bean
public Binding binding(DirectExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("rpc");
}
@Bean
public Server server() {
return new Server();
}
}
}
The Client class.
package org.architecture.messaging.amqp.rpc;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class Client {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange exchange;
int start = 0;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
System.out.println(" [x] Requesting fib(" + start + ")");
Integer response = (Integer) template.convertSendAndReceive(exchange.getName(), "rpc", start++);
System.out.println(" [.] Got '" + response + "'");
}
}
The Server class.
package org.architecture.messaging.amqp.rpc;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
public class Server {
@RabbitListener(queues = "rpc.requests")
// @SendTo("tut.rpc.replies") used when the
// client doesn't set replyTo.
public int fibonacci(int n) {
System.out.println(" [x] Received request for " + n);
int result = fib(n);
System.out.println(" [.] Returned " + result);
return result;
}
public int fib(int n) {
return n == 0 ? 0 : n == 1 ? 1 : (fib(n - 1) + fib(n - 2));
}
}
This solution is scalable without the need to change client calls:
- If the RPC server is too slow, you can scale up by just running another one. Try running a second RPCServer in a new console.
Best practices in RPC
- How should the client react if there are no servers running?
- Should a client have some kind of timeout for the RPC?
- If the server malfunctions and raises an exception, should it be forwarded to the client?
- Protecting against invalid incoming messages (eg checking bounds, type) before processing.
References
Spring AMQP https://docs.spring.io/spring-amqp/reference/htmlsingle/
http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/