Basics of RabbitMQ using AMQP

Base code: https://github.com/rabbitmq/rabbitmq-tutorials

My code:

Enterprise Integration Patterns

Messaging:

http://www.enterpriseintegrationpatterns.com/patterns/messaging/index.html

http://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html

Basic commands

From a Windows installation in command line (previously add sbin/path to PATH):

Start server: >> rabbitmq-server.bat

Listing queues: >> rabbitmqctl.bat list_queues

Admin Console: http://localhost:15672/

Default Port: 5672

Check messages unacknowledged (to monitor if there exist messages without a known tag):rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Listing Exchanges: rabbitmqctl list_exchanges

Listing Bindings: rabbitmqctl list_bindings

Bootstrap a RabbitMQ App

Spring Boot applications have the option of providing their properties through either an application.properties or application.yml file.

Simple RabbitMQ App

Implementing:

  1. NOTE: Using Spring @Profile to tag methods and classes in Spring. Have a RabbitMM basic installation.
  2. POM dependencies for Spring Boot RabbitMQ AMQP protocol
  3. Spring Boot Application class.
  4. Spring Boot Configuration class.
  5. Sender Class.
  6. Receiver Class.

POM File (particularly spring-boot-starter-amqp dependency):

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.architecture.messaging</groupId>
    <artifactId>rabbitmq-amqp</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>rabbitmq-demo-2</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.M7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.M5</spring-cloud.version>
    </properties>

    <dependencies><dependency>
        <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

The Spring Boot Application (specially SpringApplication.run method):

package org.architecture.messaging.amqp.simple;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
 * Profile: {simple|balance-requesting},{request-balance|create-balance}
 * 
 * java -jar rabbitmq-amqp-1.0.jar
 * --spring.profiles.active=balance-requesting,request-balance java -jar
 * rabbitmq-amqp-1.0.jar
 * --spring.profiles.active=balance-requesting,create-balance java -jar
 * rabbitmq-amqp-1.0.jar --spring.profiles.active=balance-notifications,notify
 * java -jar rabbitmq-amqp-1.0.jar
 * --spring.profiles.active=balance-notifications,send-notification
 * 
 * @author schkola
 *
 */
@SpringBootApplication
@ComponentScan(basePackages = { "org.architecture.messaging.amqp" })
@EnableScheduling
public class SimpleAmqpApplication {

    @Profile("usage")
    @Bean
    public CommandLineRunner usage() {
        return new CommandLineRunner() {

            @Override
            public void run(String... args) throws Exception {
                System.out.println("This app uses Spring Profiles to control its behavior.\n");
                System.out.println(
                        "Sample usage: java -jar rabbitmq-amqp-1.0.jar --spring.profiles.active=balance-requesting,request-balance");
            }

        };
    }

    @Profile("!usage")
    @Bean
    public CommandLineRunner execute() {
        return new SimpleSchedulerRunner();
    }

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SimpleAmqpApplication.class, args);
    }
}

The Configuration class (specially @Configuration).

package org.architecture.messaging.amqp.simple;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({ "simple", "balance-requesting" })
@Configuration
public class SimpleConfiguration {

    @Bean
    public Queue balance() {
        return new Queue("balance");
    }

    @Profile("create-balance")
    @Bean
    public Balance response() {
        return new Balance();
    }

    @Profile("request-balance")
    @Bean
    public RequestBalance request() {
        System.out.println(">>> Creating RequestBalance Bean");
        return new RequestBalance();
    }
}

The Sender class, using RabbitTemplatefor AMQP protocol.

package org.architecture.messaging.amqp.simple;

import java.util.Random;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class RequestBalance {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private Queue queue;

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void simpleRequest() {
        Random r = new Random();
        int account = r.nextInt((99999 - 1) + 1) + 1;
        String message = "Request balance for account " + account;
        this.template.convertAndSend(queue.getName(), message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

The Receiver class, with @RabbitListener and @RabbitHandler

package org.architecture.messaging.amqp.simple;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

@RabbitListener(queues = "balance")
public class Balance {

    @RabbitHandler
    public void createBalance(String request) {
        System.out.println(" [x] Received '" + request + "'");
    }

}

Worker RabbitMQ App

Distributing time-consuming tasks among workers. We encapsulate a _task _as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

http://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html

Implementing:

  1. Same Steps 1 to 3 as before.
  2. Spring Boot Configuration class.
  3. Sender Class.
  4. Receiver Class.

Spring Boot configuration file:

package org.architecture.messaging.amqp.worker;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({ "worker", "notifications" })
@Configuration
public class WorkerConfiguration {

    @Bean
    public Queue notifications() {
        return new Queue("notifications");
    }

    @Profile("send-notification")
    private static class NotificationConfig {

        @Bean
        public NotificationSender sendBankNotification() {
            System.out.println(">>> Creating NotificationSender (BANK) Bean");
            return new NotificationSender("BANK");
        }

        @Bean
        public NotificationSender sendCustomerNotification() {
            System.out.println(">>> Creating NotificationSender (CUSTOMER) Bean");
            return new NotificationSender("CUSTOMER");
        }
    }

    @Profile("notify")
    @Bean
    public Notify requestNotification() {
        System.out.println(">>> Creating Notify Bean");
        return new Notify();
    }
}

The sender file is the same as before, using @RabbitTemplate to send messages to RabbitMQ using AMQP.

package org.architecture.messaging.amqp.worker;

import java.util.Random;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class Notify {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private Queue queue;

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        System.out.println(">>> Into Notify: generating message");
        Random r = new Random();
        int notificationLenght = r.nextInt((10 - 1) + 1) + 1;
        this.template.convertAndSend(queue.getName(), notificationLenght+"");
        System.out.println(" [x] Sent '" + notificationLenght + "'");
    }

}

Receiver class using@RabbitListener and @RabbitHandler. Besides using Spring StopWatchclass to control asynchronous processing for every worker.

package org.architecture.messaging.amqp.worker;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

@RabbitListener(queues = "notifications")
public class NotificationSender {

    private String who;

    public NotificationSender(String who) {
        this.who = who;
    }

    @RabbitHandler
    public void sendEmail(String sMessageLenght) throws InterruptedException {
        int messageLenght = Integer.parseInt(sMessageLenght);
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + this.who + " [x] Received message for notification '" + messageLenght + "'");
        doWork(messageLenght);
        watch.stop();
        System.out.println("instance " + this.who + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(int messageLenght) throws InterruptedException {
        Thread.sleep(1000 * messageLenght);
    }
}

Error Handling

Spring-amqp by default takes a conservative approach to message acknowledgement. If the listener throws an exception the container calls: channel.basicReject(deliveryTag, requeue).The listener could throw an AmqpRejectAndDontRequeueException. After processing the message the listener calls: channel.basicAck().

Setting property defaultRequeueRejected=false could change the behaviour of requeue.

Acknowledged means the consumer must notify RabbitMQ the message has been totally processed.

Message durability

Some common properties for Spring AMQP are in https://docs.spring.io/spring-amqp/reference/htmlsingle/#_common_properties.

Property default Description
durable true When declareExchange is true the durable flag is set to this value
deliveryMode PERSISTENT PERSISTENT or NON_PERSISTENT to determine whether or not RabbitMQ should persist the messages

There is not guarantee to mark a message as persistent for having it stored in a data store. There are more options to do it.

https://www.rabbitmq.com/confirms.html

Message Dispatching

By default, RabbitMQ uses Round robin dispatching algorithm (n-th message is assigned to the n-th consumer). But Spring AMQP uses the Fair dispatching algorithm using SimpleMessageListenerContainerthat defines the value for DEFAULT_PREFETCH_COUNT to be 1. That means RabbitMQ only sends new arrival message to consumer if this does not have a pending message or when the previous message has been acknowledged. DEFAULT_PREFETCH_COUNT set to 0 means Round Robin behaviour.

See more dispatching stragegies in literature.

Publish/Suscribe RabbitMQ App

Implement the fanout pattern to deliver a message to multiple consumers. Essentially, published messages are going to be broadcast to all the receivers.

Fanout Pattern

Fan-in Fan-out

is a way of Multiplexing and Demultiplexing in golang. Fan-in refers to processing multiple input data and combining into a single entity. Fan-out is the exact opposite, dividing the data into multiple smaller chunks, distributing the work amongst a group of workers to parallelize CPU use and I/O.

https://en.wikipedia.org/wiki/Fan-out_(software\

https://medium.com/@thejasbabu/concurrency-patterns-golang-5c5e1bcd0833

https://aws.amazon.com/blogs/compute/messaging-fanout-pattern-for-serverless-architectures-using-amazon-sns/

https://community.mapr.com/thread/19377-fan-out-design-pattern

https://community.mapr.com/external-link.jspa?url=https%3A%2F%2Fheroku.github.io%2Fkafka-demo%2F

Exchanges

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

Exchanges types:

  • direct
  • topic
  • fanout
  • headers

Default fanouts declared for RabbitMQ:

C:\ISBAN\workspaces\messaging-architecture\rabbitmq-demo-2>rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
springCloudBusOutput    topic
amq.topic       topic
springCloudBus  topic
        direct
amq.match       headers
amq.rabbitmq.trace      topic
amq.fanout      fanout
amq.rabbitmq.log        topic
amq.headers     headers
amq.direct      direct

Nameless exchange: it is defined by "".

Temporary queues

Giving a queue a name is important when you want to share the queue between producers and consumers. AnonymousQueue, which creates a non-durable, exclusive, autodelete queue with a generated name.

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. Secondly, once we disconnect the consumer the queue should be automatically deleted.

The relationship between exchange and a queue is called a binding.

We need to supply a routingKey when sending, but its value is ignored for fanout exchanges.

The messages will be lost if no queue is bound to the exchange yet.

Implementing

  1. Same Steps 1 to 3 as before.
  2. Spring Boot Configuration class. Using AnonymousQueue, FanoutExchangeand Binding.
  3. Sender Class.
  4. Receiver Class.

Spring Boot configuration file:

package org.architecture.messaging.amqp.fanout;

import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({ "fanout", "pub-sub", "publish-suscribe" })
@Configuration
public class FanoutConfiguration {

    @Bean
    public FanoutExchange fanout() {
        return new FanoutExchange("new-accounts-fanout");
    }

    @Profile("account-registration")
    private static class BankAccountBuilder {

        @Bean
        public Queue customerAccountQueue() {
            return new AnonymousQueue();
        }

        @Bean
        public Queue bankAccountQueue() {
            return new AnonymousQueue();
        }

        @Bean
        public Binding bindCustomer(FanoutExchange fanout, Queue customerAccountQueue) {
            return BindingBuilder.bind(customerAccountQueue).to(fanout);
        }

        @Bean
        public Binding bindBank(FanoutExchange fanout, Queue bankAccountQueue) {
            return BindingBuilder.bind(bankAccountQueue).to(fanout);
        }

        @Bean
        public AccountRegistration registration() {
            return new AccountRegistration();
        }
    }

    @Profile("new-account")
    @Bean
    public AccountRequest registration() {
        return new AccountRequest();
    }
}

Using Publisher. Using FanoutExchange

package org.architecture.messaging.amqp.fanout;

import java.util.Random;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class AccountRequest {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private FanoutExchange fanout;

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void request() {
        Random r = new Random();
        int account = r.nextInt((99999 - 1) + 1) + 1;
        String message = "Request new account " + account;
        this.template.convertAndSend(fanout.getName(), "", message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

Using Subscriber. Omit using @RabbitHandler.

package org.architecture.messaging.amqp.fanout;

import java.util.Random;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class AccountRegistration {

    @RabbitListener(queues = "#{customerAccountQueue.name}")
    public void createCustomerAccount(String userInfo) throws InterruptedException {
        createAccount(userInfo, "CUSTOMER_ACCOUNT");
    }

    @RabbitListener(queues = "#{bankAccountQueue.name}")
    public void createBankAccount(String userInfo) throws InterruptedException {
        createAccount(userInfo, "BANK_ACCOUNT");
    }

    public void createAccount(String userInfo, String receiver) throws InterruptedException {
        System.out.println("instance " + receiver + " [x] Received message for account creation '" + userInfo + "'");
        Random r = new Random();
        int messageLenght = r.nextInt((10 - 1) + 1) + 1;
        StopWatch watch = new StopWatch();
        watch.start();
        doWork(messageLenght);
        watch.stop();
        System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(int messageLenght) throws InterruptedException {
        Thread.sleep(1000 * messageLenght);
    }
}

Monitoring:

C:\ISBAN\workspaces\messaging-architecture\rabbitmq-demo-1>rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        balance queue   balance []
        exchange        hello   queue   hello   []
        exchange        notifications   queue   notifications   []
        exchange        spring.gen-PRXBkLIfTkSn_fKVSsIodw       queue   spring.gen-PRXBkLIfTkSn_fKVSsIodw       []
        exchange        spring.gen-j7YewEFZTHudPP9ULNGeWA       queue   spring.gen-j7YewEFZTHudPP9ULNGeWA       []
new-accounts-fanout     exchange        spring.gen-PRXBkLIfTkSn_fKVSsIodw       queue           []
new-accounts-fanout     exchange        spring.gen-j7YewEFZTHudPP9ULNGeWA       queue           []

Routing RabbitMQ App

Make it possible to subscribe only to a subset of the messages. The meaning of a binding key depends on the exchange type. For example, we may want a program which writes log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.

Direct Exchange

The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message. It could define more than one binding for a single queue using different binding key. Messages arriving to the exchange which not match any key are discarded.

It can't do routing based on multiple criteria.

The Spring configuration class:

package org.architecture.messaging.amqp.routing;

import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({ "direct-exchange", "routing" })
@Configuration
public class RoutingConfiguration {

    @Bean
    public DirectExchange direct() {
        return new DirectExchange("logging-exchange");
    }

    @Profile("logger")
    private static class MyLoggerConfig {

        @Bean
        public Queue errorLogQueue() {
            return new AnonymousQueue();
        }

        @Bean
        public Queue appLogQueue() {
            return new AnonymousQueue();
        }

        @Bean
        public Binding bindErrorLog(DirectExchange direct, Queue errorLogQueue) {
            return BindingBuilder.bind(errorLogQueue).to(direct).with("error");
        }

        @Bean
        public Binding bingInfoLog(DirectExchange direct, Queue appLogQueue) {
            return BindingBuilder.bind(appLogQueue).to(direct).with("info");
        }

        @Bean
        public Binding bingDebugLog(DirectExchange direct, Queue appLogQueue) {
            return BindingBuilder.bind(appLogQueue).to(direct).with("debug");
        }

        @Bean
        public MyLogger logger() {
            return new MyLogger();
        }
    }

    @Profile("log")
    @Bean
    public MyLog logging() {
        return new MyLog();
    }
}

The producer class:

package org.architecture.messaging.amqp.routing;

import java.util.Random;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class MyLog {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private DirectExchange direct;

    private String[] keys = { "error", "debug", "info" };

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void log() {
        Random r = new Random();
        int keyIndex = r.nextInt((3 - 1) + 1) + 1;
        String message = "Sending a message to a key " + keys[keyIndex - 1];
        this.template.convertAndSend(direct.getName(), keys[keyIndex - 1], message);
        System.out.println(" [x] Sent '" + message + "'");

    }
}

The consumer class:

package org.architecture.messaging.amqp.routing;

import java.util.Random;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class MyLogger {

    @RabbitListener(queues = "#{errorLogQueue.name}")
    public void logError(String logInfo) throws InterruptedException {
        logger(logInfo, "ERROR");
    }

    @RabbitListener(queues = "#{appLogQueue.name}")
    public void logAppInfo(String logInfo) throws InterruptedException {
        logger(logInfo, "INFO");
    }

    public void logger(String logInfo, String receiver) throws InterruptedException {
        System.out.println("instance " + receiver + " [x] Received message for logging '" + logInfo + "'");
        Random r = new Random();
        int messageLenght = r.nextInt((10 - 1) + 1) + 1;
        StopWatch watch = new StopWatch();
        watch.start();
        doWork(messageLenght);
        watch.stop();
        System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(int messageLenght) throws InterruptedException {
        Thread.sleep(1000 * messageLenght);
    }
}

Topic RabbitMQ App

In our messaging system we might want to subscribe to not only queues based on the routing key, but also based on the source which produced the message.

Topic Exchange

A message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. However there are two important special cases for binding keys:

  • *(star) can substitute for exactly one word.
  • #(hash) can substitute for zero or more words.

There can be as many words in the routing key as you like, up to the limit of 255 bytes.

Implementing

The Spring Configuration file.

package org.architecture.messaging.amqp.topic;

import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

/**
 * Binding keys: - *.own.* { <type>.own.<status> } Examples: - credit.own.active
 * - debit.own.canceled - other.# { other.<bank>.<status> } Examples: -
 * other.bbva.active - other.santander.inactive
 * 
 * @author Z462725
 *
 */
@Profile({ "topic-exchange", "topics" })
@Configuration
public class TopicConfiguration {

    @Bean
    public TopicExchange topic() {
        return new TopicExchange("topic-exchange");
    }

    @Profile("accounts")
    private static class MyAccountsConfig {

        @Bean
        public Queue ownAccountsQueue() {
            return new AnonymousQueue();
        }

        @Bean
        public Queue thirdPartyAccountsQueue() {
            return new AnonymousQueue();
        }

        @Bean
        public Binding bindOwnCreditAccounts(TopicExchange topic, Queue ownAccountsQueue) {
            return BindingBuilder.bind(ownAccountsQueue).to(topic).with("credit.own.*");
        }

        @Bean
        public Binding bingOwnDebitAccounts(TopicExchange topic, Queue ownAccountsQueue) {
            return BindingBuilder.bind(ownAccountsQueue).to(topic).with("debit.own.*");
        }

        @Bean
        public Binding bingThirdPartyAccounts(TopicExchange topic, Queue thirdPartyAccountsQueue) {
            return BindingBuilder.bind(thirdPartyAccountsQueue).to(topic).with("other.#");
        }

        @Bean
        public Account accountInfo() {
            return new Account();
        }
    }

    @Profile("customer")
    @Bean
    public Customer query() {
        return new Customer();
    }
}

The Producer class.

package org.architecture.messaging.amqp.topic;

import java.util.Random;

import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class Customer {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private TopicExchange topic;

    private final String[] keys = { "credit.own.active", "debit.own.canceled", "other.bbva.active",
            "other.santander.inactive", "ignored.santander.inactive", "credit.ignored.more" };

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void request() {
        Random r = new Random();
        int keyIndex = r.nextInt((6 - 1) + 1) + 1;
        String message = "Sending a message to a key " + keys[keyIndex - 1];
        this.template.convertAndSend(topic.getName(), keys[keyIndex - 1], message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

The Consumer class.

package org.architecture.messaging.amqp.topic;

import java.util.Random;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class Account {

    @RabbitListener(queues = "#{ownAccountsQueue.name}")
    public void consultOwnAccounts(String customerInfo) throws InterruptedException {
        accounts(customerInfo, "OWN");
    }

    @RabbitListener(queues = "#{thirdPartyAccountsQueue.name}")
    public void consultThirdPartyAccounts(String customerInfo) throws InterruptedException {
        accounts(customerInfo, "THIRD_PARTY");
    }

    public void accounts(String customerInfo, String receiver) throws InterruptedException {
        System.out.println("instance " + receiver + " [x] Received message for customerInfo '" + customerInfo + "'");
        Random r = new Random();
        int messageLenght = r.nextInt((10 - 1) + 1) + 1;
        StopWatch watch = new StopWatch();
        watch.start();
        doWork(messageLenght);
        watch.stop();
        System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(int messageLenght) throws InterruptedException {
        Thread.sleep(1000 * messageLenght);
    }
}

RPC RabbitMQ App

For no long-executing time tasks there is an alternative called RPC (Remote Procedure Call) or Client-Server call. It uses the pattern:

http://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html

Disadvantage: The problems arise when a programmer is not aware whether a function call is local or if it's a slow RPC.

In mind:

  • Make sure it's obvious which function call is local and which is remote.
  • Document your system. Make the dependencies between components clear.
  • Handle error cases. How should the client react when the RPC server is down for a long time?

Alternative: If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.

Callback Queue

https://docs.spring.io/spring-amqp/reference/htmlsingle/#request-reply

Typically the native client would create a callback queue for every RPC request. That raises a new issue, having received a response in that queue it's not clear to which request the response belongs. That's when the correlationIdproperty is used. Spring-amqp automatically sets a unique value for every request. In addition it handles the details of matching the response with the correct correlationID.

aa

Message properties

The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:

  • deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value).
  • contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
  • replyTo: Commonly used to name a callback queue.
  • correlationId: Useful to correlate RPC responses with requests.

Handling Errors / Timeouts

When the response is not delivered on time, some exceptions occurs:

Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout

Caused by: java.lang.InterruptedException: null

Implementing

The Spring Configuration class.

package org.architecture.messaging.amqp.rpc;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({ "rpc", "client-server" })
@Configuration
public class RpcConfiguration {

    @Profile("client")
    private static class ClientConfig {

        @Bean
        public DirectExchange exchange() {
            return new DirectExchange("rpc-exchange");
        }

        @Bean
        public Client client() {
            return new Client();
        }

    }

    @Profile("server")
    private static class ServerConfig {

        @Bean
        public Queue queue() {
            return new Queue("rpc.requests");
        }

        @Bean
        public DirectExchange exchange() {
            return new DirectExchange("rpc-exchange");
        }

        @Bean
        public Binding binding(DirectExchange exchange, Queue queue) {
            return BindingBuilder.bind(queue).to(exchange).with("rpc");
        }

        @Bean
        public Server server() {
            return new Server();
        }

    }
}

The Client class.

package org.architecture.messaging.amqp.rpc;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class Client {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private DirectExchange exchange;

    int start = 0;

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        System.out.println(" [x] Requesting fib(" + start + ")");
        Integer response = (Integer) template.convertSendAndReceive(exchange.getName(), "rpc", start++);
        System.out.println(" [.] Got '" + response + "'");
    }
}

The Server class.

package org.architecture.messaging.amqp.rpc;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

public class Server {

    @RabbitListener(queues = "rpc.requests")
    // @SendTo("tut.rpc.replies") used when the
    // client doesn't set replyTo.
    public int fibonacci(int n) {
        System.out.println(" [x] Received request for " + n);
        int result = fib(n);
        System.out.println(" [.] Returned " + result);
        return result;
    }

    public int fib(int n) {
        return n == 0 ? 0 : n == 1 ? 1 : (fib(n - 1) + fib(n - 2));
    }
}

This solution is scalable without the need to change client calls:

  • If the RPC server is too slow, you can scale up by just running another one. Try running a second RPCServer in a new console.

Best practices in RPC

  • How should the client react if there are no servers running?
  • Should a client have some kind of timeout for the RPC?
  • If the server malfunctions and raises an exception, should it be forwarded to the client?
  • Protecting against invalid incoming messages (eg checking bounds, type) before processing.

References

Spring AMQP https://docs.spring.io/spring-amqp/reference/htmlsingle/

http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/

results matching ""

    No results matching ""