So we have this project at the University where we develope a Microservice application. And for four month now we did the communication with a rest-api. There was never a problem with that until our supervisor decided to make the internal communication of the microservices synchronous with ActiveMQ. So bear with me how i found a way for me to make it work.

The project is available on my Github - check it out here.

ActiveMQ Setup

There is a good explanation on how to get started with ActiveMQ on their website, but since we where using docker, it’s much simpler to run the following docker command: docker run -p 61616:61616 -p 8161:8161 -t webcenter/activemq. If the container is started you can find the ActiveMQ-dashboard under http://localhost:8161/admin with username and password set to ‘admin’. For production you can find more setup-posabillities here.

Application Setup

To setup for developing with ActiveMQ we need to do the following steps for the producer and the consumer project.

  1. Add the following dependency to your pom.xml:
1
2
3
4
5
<!-- ActiveMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
  1. Add the following properties to your application.properties or application.yml:
1
2
3
4
activemq.user="admin"
activemq.password="admin"
activemq.broker-url=tcp://localhost:61616
server.port=8081
  1. Add a new Config.java with following code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@EnableJms
@Configuration
public class Config {

    @Value("${activemq.broker-url}")
    private String brokerUrl;

    @Value("${activemq.user}")
    private String username;

    @Value("${activemq.password}")
    private String password;

    @Bean
    public Destination addDestination() {
        return new ActiveMQQueue("add-user-queue");
    }

    @Bean
    public Destination getRequestDestination() {
        return new ActiveMQQueue("get-user-queue");
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory factory
        		= new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerUrl);
        factory.setUserName(username);
        factory.setPassword(password);
        return factory;
    }

    @Bean
    public JmsTemplate jmsTemplate(){
        return new JmsTemplate(connectionFactory());
    }
}

At line 5 to 12 we get the values of the application properties. At line 14 to 22 we implement two new ActiveMqQueues as destination, which we will later use to queue our messages. The last lines are to connect to our ActiveMQ (line 24 to 31) and create a template (line 35) to send our messages with.

That’s that with the application setup and we can finally think about some implementations!

Simple Object Messaging

First we need an object to send, in my case it’s the UserObject with name and age. Simple like that.

1
2
3
4
5
public class UserObject {
    private String name;
    private int age;
    // Getter and Setter ...
}

The next thing we need is the produce, which put our user object in the correct queue of the ActiveMQ. For that we create an REST-controller class with a POST-mapping on the route /add, which takes an user as the request-body like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@RestController
@RequestMapping("/")
public class Producer {

    @Autowired
    JmsTemplate jmsTemplate;

    @Autowired
    Destination addDestination;

    @PostMapping("/add")
    ResponseEntity<?> add(@RequestBody UserObject user)
    		throws JsonProcessingException {

        ObjectMapper mapper = new ObjectMapper();
        String userAsString = mapper.writeValueAsString(user);

        jmsTemplate.convertAndSend(addDestination, userAsString);

        return ResponseEntity.ok("Produced successfully: "
        		+  userAsString);
    }
}

At line 15 we convert the user object we got to a json-string, which we are then send to our addDestination of our ActiveMQ (line 17).

Now we can test our producer. Start your ActiveMQ, go to http://localhost:8161/admin, log in (standart username and password is admin) and click on Queues. It should look something like that:

ActiveMQ Queues empty

Now we run our Spring-activemq-producer project and send a post-request with a user in json-format:

1
2
> curl -d '{"name":"Jacob","age":28}' -H "Content-Type: application/json"
-X POST http://localhost:8081/add

We get the return Produced successfully: {"name":"Jacob","age":28}

If we check our ActiveMQ-dashboard we see there is a queue called add-user-queue with one enqueued message, which is also still pending.

ActiveMQ Message pending

Now we can produce messages and need a consumer to work of the queue. For that we just create a consumer component class with a JMSListener function in the consumer-spring-project like so:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class Consumer {

    @Autowired
    JmsTemplate jmsTemplate;

    private List<UserObject> list = new ArrayList<>();
    private static int index = 0;

    @JmsListener(destination = "add-user-queue")
    public void add(String userAsString) throws IOException {

        ObjectMapper mapper = new ObjectMapper();
        UserObject user = mapper.readValue(userAsString,
        		UserObject.class);

        list.add(index, user);

        System.out.println("Added user: " + user.getName() + ", "
        		+ user.getAge() + " at index " + index);
        index++;
}

At line 10 we define the destination to listen to, in our case it’s the queue called ‘add-user-queue’. The function takes the user in json-format, converts it to a user object (line 14), and add it to our user list (line 16).

Thats it with the consumer and we can just run the application since we already have a pending message in our queue. Imidiatly after the project launched we should see Added user: Jacob, 28 at index 0 in our project console and if we check our ActiveMQ-dashboard it should look like this:

ActiveMQ Message consumed

No pending message, but a new consumer (the one we just created) and one message dequeued.

Request/Reply Messaging

To realize a request-reply-pattern with ActiveMQ, we basically create a temporary queue, which we are putting in the message itself. The consumer then knows where to produce the reply. The producer then have to check the temporary queue for a reply.

So lets implement the producer function in the Producer.java. This function handles a GET-request to /get/{index}, requests the user with given index from the consumer and expects a reply.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Autowired
Destination getRequestDestination;

@Autowired
ConnectionFactory connectionFactory;

@GetMapping("/get/{index}")
public String request(@PathVariable("index") final int index)
		throws JMSException, JsonProcessingException {

    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false,
    		Session.AUTO_ACKNOWLEDGE);

    Destination replyDestination = session.createTemporaryQueue();

    Message message = session.createMessage();
    message.setIntProperty("index", index);
    message.setJMSReplyTo(replyDestination);
    message.setJMSCorrelationID(Long.toHexString(
    		new Random(System.currentTimeMillis()).nextLong()));

    jmsTemplate.convertAndSend(getRequestDestination, message);

    MessageConsumer consumer = session.createConsumer(replyDestination);
    TextMessage reply = (TextMessage)consumer.receive();
    System.out.println("RECEIVED: "  + reply.getText());

    session.close();

    return "Received successfully: " +  reply.getText();
}

At line 16 we define a new temporary queue, which we are attaching to the message with a JMSCorrelationID (line 20) and on which we are set a consumer to receive the reply (line 26f). We put the index as property to the message (line 19). The request message is produced into a fixed queue (line 23).

For testing I added two users like this:

1
2
3
4
5
6
7
> curl -d '{"name":"Jacob","age":28}' -H "Content-Type: application/json"
-X POST http://localhost:8081/add
> Produced successfully: {"name":"Jacob","age":28}

> curl -d '{"name":"Jenny","age":23}' -H "Content-Type: application/json"
-X POST http://localhost:8081/add
> Produced successfully: {"name":"Jenny","age":23}

After that I requested the user with index equal to zero (which should be ‘Jacob’):

1
> curl -X GET http://localhost:8081/get/0

ActiveMQ Message consumed

In the ActiveMQ-dashboard we now see that the user I added where consumed, but the request message is pending. So lets fix that and create a consumer which will reply the right user. For that we add another JMSListener to our consumer-spring-project, which is listening to the ‘get-user-queue’, get the right user from our list and reply to the temporary queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Autowired
ConnectionFactory connectionFactory;

@JmsListener(destination = "get-user-queue")
public void receiveReply(Message indexMessage)
		throws IOException, JMSException {
        
    // System.out.println("Received Message: " + indexMessage);

    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false,
    		Session.AUTO_ACKNOWLEDGE);

    ObjectMapper mapper = new ObjectMapper();
    int i = indexMessage.getIntProperty("index");

    String replyPayload = "No user with this index.";

    try {
        UserObject user = list.get(i);
        replyPayload = mapper.writeValueAsString(user);

    } catch (Exception e) {
        System.out.println("ERROR: " + e);
    }

    TextMessage replyMessage = session.createTextMessage(replyPayload);
    replyMessage.setJMSDestination(indexMessage.getJMSReplyTo());
    replyMessage.setJMSCorrelationID(indexMessage.getJMSCorrelationID());

    MessageProducer producer = session.createProducer(
    		indexMessage.getJMSReplyTo());
    producer.send(replyMessage);

    session.close();

    System.out.println("Replyed Message: " + replyMessage);

}

At line 16 we get the index out of the message, get the user with this index from the list (line 21), convert it and set it as payload to a new message (line 28). After that we create a producer to put the message to the temporary queue we received with the indexMessage (line 29f and 32f).

So lets do the last test and run the consumer project. Because the consumer project holds the user-list we also have to put users in again:

1
2
3
4
5
6
7
> curl -d '{"name":"Jacob","age":28}' -H "Content-Type: application/json"
-X POST http://localhost:8081/add
> Produced successfully: {"name":"Jacob","age":28}

> curl -d '{"name":"Jenny","age":23}' -H "Content-Type: application/json"
-X POST http://localhost:8081/add
> Produced successfully: {"name":"Jenny","age":23}

After that lets request the user with index equal to zero (which should be ‘Jacob’):

1
2
3
> curl -X GET http://localhost:8081/get/0

> Received successfully: {"name":"Jacob","age":28}

We received the correct user in json-format, and our ActiveMQ-dashboard looks like that:

ActiveMQ Message consumed

So everything went right.

I hope I was able to help you. Maybe it’s not the best solution but it worked for me. The project is available on my Github - check it out here.