Saturday, October 18, 2014

Java Messaging Service (JMS) Introduction

Java Messaging Service (JMS) provides a mechanism for applications to create, send and receive messages using reliable, asynchronous and loosely coupled communication. JMS is defined as part of the Java Enterprise Edition.

JMS supports two messaging models, point-to-point model and the publish subscribe model. In the point to point messaging model we have a message queue and there is one consumer for the messages, but there could be multiple producers writing to the message queue. The publish subscribe message model is more like a bulletin board where there could be multiple consumers for a message. JMS client address messages to a Topic.

Some of the common terminology used with JMS applications is listed below.

  1. JMS Provider The underlying messaging system that implements the JMS APIs and supports the administrative and control features. ActiveMQ is a JMS Provider.
  2. JMS Client An application that produces or consumes messages.
  3. Message Objects used for communication between JMS clients.
  4. Administrative Objects Preconfigured objects like connection factories and destinations that are created by an administrator and used by clients.

ActiveMQ Introduction and Configuration

ApacheMQ is a popular and powerful open source messaging provider and fully supports the JMS API specifications. There are several other JMS providers including Oracle Weblogic, WebSphere MQ from IBM, OpenJMS to name a few. For the purpose of this tutorial we will use ActiveMQ as the JMS provider.

Installation and configuration of ActiveMQ is pretty simple.
  1. Download binary release for the specific platform from this link http://activemq.apache.org/.
  2. Unzip the downloaded file to a specific folder.
  3. Start ActiveMQ using the below command on a terminal window.
  4. C:\Installs\ApacheMQ\bin\activemq start
    
  5. Test if ApacheMQ is running correctly by checking if it is listening on port 61616.
  6. C:\Users\dummy>netstat -an | find "61616"
      TCP    0.0.0.0:61616    0.0.0.0:0     LISTENING
      TCP    [::]:61616       [::]:0        LISTENING
    
  7. ApacheMQ can be monitored through the Web Console using this url http://localhost:8161/admin. Username and password is admin/ admin.

JMS Producer Example

In the JMS Producer application we use a ActiveMQ connection factory object to create a Connection to the underlying messaging system. Then we create a context for sending messages using the Session object. Then we create a Topic instance to get publish and subscribe behavior. MessageProducer is used for sending messages.

Note. Include "activemq-all-5.10.0.jar" available in the ActiveMQ installation directory as an external jar to the JMS Producer project.

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class PublisherMain {
  
  public static void main (String[] args) {

    ConnectionFactory connectionFactory = null;
    Connection connection = null;
    
    try {     
      // Get a connection for sending messages
      connectionFactory = new ActiveMQConnectionFactory();
      connection = connectionFactory.createConnection();
      connection.start();
      
      // Create a session
      Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
      
      // Create a topic and send messages
      Topic topic = session.createTopic("test1");
      MessageProducer producer = session.createProducer(topic);
      while ( true ) {
        TextMessage message = session.createTextMessage();
        message.setText("This is a test JMS Message");
        producer.send(message);
        Thread.sleep(10000);
      }
    } catch (JMSException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    finally {
      try {
        connection.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }
}

JMS Consumer Example


In the JMS Producer application we use a ActiveMQ connection factory object to create a Connection to the underlying messaging system. Then we create a Session context and subscribe to the topic. MessageConsumer is used for receiving messages.

Note. Include "activemq-all-5.10.0.jar" available in the ActiveMQ installation directory as an external jar to the JMS Consumer project.

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Message;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ListenerMain {
  
  public static void main(String[] args) {
    ConnectionFactory connectionFactory = null;
    Connection connection = null;
    try {
      // Get a connection for receiving messages    
      connectionFactory = new ActiveMQConnectionFactory();
      connection = connectionFactory.createConnection();
      connection.start();
      
      // Create a session
      Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
      
      // Create a topic and subscribe
      Topic topic = session.createTopic("test1");       
      MessageConsumer consumer = session.createConsumer(topic);
          
      // Receive the messages and print
      while ( true ) {
        Message message = consumer.receive();
        
        if (message instanceof TextMessage) {
              TextMessage textMessage = (TextMessage) message;
              System.out.println("Received message: "
                      + textMessage.getText());
        }
      }
    } catch (JMSException e) {
      e.printStackTrace();
    }
    finally {
      try {
        connection.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }
}

This program produces the following output.

Received message: This is a test JMS Message
Received message: This is a test JMS Message
Received message: This is a test JMS Message
Received message: This is a test JMS Message

Read other Java tutorials from here.
Thursday, September 25, 2014

ReentrantReadWriteLock Introduction

A read-write lock provides greater level of concurrency than a mutual exclusion lock when working with shared data. Read-write locks allows simultaneous read only operations by multiple threads whereas a write operation can be performed by only one thread. Read-write locks provide increased level of concurrency and typically results in better performance if the frequency of read operations is more than write operations.

Java ReentrantReadWriteLock

Java supports ReadWriteLock interface in java.utils.concurrent.locks package. ReentrantReadWriteLock is an implementation of the ReadWriteLock interface. ReadWriteLock maintains a pair of locks one for read only operations and one for write operations. The read lock can be held by multiple threads simultaneously whereas a write lock can held only by one thread at a time. Some of the key APIs of ReentrantReadWriteLock are listed below.

  • ReentrantReadWriteLock() - Creates a new instance of  ReentrantReadWriteLock.
  • ReentrantReadWriteLock(boolean fair) - Creates a new instance of  ReentrantReadWriteLock with fairness policy.
  • ReentrantReadWriteLock.ReadLock readLock() - Returns lock used for reading.
  • ReentrantReadWriteLock,WriteLock writeLock() - Returns lock used for writing.

Java ReentrantReadWriteLock Example

In this example we have a producer task requesting for write lock and two reader tasks requesting for read lock. When write lock is provided both the reader threads are blocked. However both the reader threads work simultaneously since they request only for read locks.

package com.sourcetricks.readwritelock;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {
  
  // Shared resources to be guarded
  private static int count = 0;

  // Producer thread. Counts up the resource.
  private static class ProducerTask implements Runnable {

    private ReentrantReadWriteLock lock = null;

    ProducerTask(ReentrantReadWriteLock lock) {
      this.lock = lock;
    }

    @Override
    public void run() {
      System.out.println("Producer requesting lock");
      lock.writeLock().lock();
      System.out.println("Producer gets lock");
      for (int i = 0; i < 5; i++) {
        count = count + 1;
        System.out.println("Counting up");
      }
      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("Producer releases lock");
      lock.writeLock().unlock();
    }

  }

  // Reader task
  private static class AnotherReadTask implements Runnable {

    private ReentrantReadWriteLock lock = null;

    AnotherReadTask(ReentrantReadWriteLock lock) {
      this.lock = lock;
    }

    @Override
    public void run() {
      System.out.println("Another Reader requesting lock");
      lock.readLock().lock();
      System.out.println("Another Reader gets lock");
      System.out.println("Current count = " + count);
      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("Another Reader releases lock");
      lock.readLock().unlock();
    }
  }
  
  // Reader task
  private static class ReadTask implements Runnable {

    private ReentrantReadWriteLock lock = null;

    ReadTask(ReentrantReadWriteLock lock) {
      this.lock = lock;
    }
    
    @Override
    public void run() {
      System.out.println("Reader requesting lock");
      lock.readLock().lock();
      System.out.println("Reader gets lock");
      System.out.println("Current count = " + count);
      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("Reader releases lock");
      lock.readLock().unlock();     
    }   
  }

  public static void main(String[] args) {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    Thread producer = new Thread(new ProducerTask(lock));
    Thread anotherReader = new Thread(new AnotherReadTask(lock));
    Thread reader = new Thread(new ReadTask(lock));
    producer.start();
    reader.start();
    anotherReader.start();
  }
}

This program produces the following output.

Producer requesting lock
Reader requesting lock
Producer gets lock
Counting up
Counting up
Counting up
Counting up
Counting up
Another Reader requesting lock
Producer releases lock
Reader gets lock
Current count = 5
Another Reader gets lock
Current count = 5
Reader releases lock
Another Reader releases lock

Refer to this link for more Java Tutorials.
Monday, September 22, 2014

ReentrantLock Introduction

Lock provides a tool to control access to a shared resource in a multi-threaded environment. A lock provides access to only one thread at a time to the shared resource. Using synchronized methods and statements provides access to an implicit lock for an object. However in certain scenarios we might need access explicitly to the lock object for flexibility. ReentrantLock is a mutual exclusion lock similar to implicit lock provided by synchronized methods and statements but with additional flexibility. 

Java ReentrantLock

Java supports Lock interface in the package java.util.concurrent.locks. ReentrantLock implements the Lock interface and provides the re-entrant mutual exclusive lock. Some of the key APIs of ReentrantLock are listed below.

  • ReentrantLock() - Creates ReentrantLock instance.
  • void lock() - Acquires the lock.
  • void unlock() - Releases the lock.
  • boolean tryLock() - Acquires the lock only if the lock is not held by another thread.
  • boolean isLocked() - Checks if lock is held by another thread.

Java ReentrantLock Example

In this example we have a producer and consumer task using a ReentrantLock object to protect a shared resource. 

package com.sourcetricks.locks;

import java.util.concurrent.locks.ReentrantLock;

public class JavaLocksDemo {

    // Shared resources to be guarded
    private static int count = 0;
    
    // Producer thread. Counts up the resource.
    private static class ProducerTask implements Runnable {

      private ReentrantLock lock = null;
      
      ProducerTask(ReentrantLock lock) {
        this.lock = lock;
      }
      
      @Override
      public void run() {       
          System.out.println("Producer requesting lock");
          lock.lock();
          System.out.println("Producer gets lock");
          for ( int i = 0; i < 5; i++ ) {
            count = count + 1;
            System.out.println("Counting up");
          }
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println("Producer releases lock");
          lock.unlock();
      }
      
    }
    
    // Consumer thread. Counts down the resource.
    private static class ConsumerTask implements Runnable {

      private ReentrantLock lock = null;
      
      ConsumerTask(ReentrantLock lock) {
        this.lock = lock;
      }
      
      @Override
      public void run() {
          System.out.println("Consumer requesting lock");
          lock.tryLock();
          System.out.println("Consumer gets lock");
          for ( int i = 0; i < 5; i++ ) {
            count = count - 1;
            System.out.println("Counting down");
          }
          System.out.println("Consumer releases lock");         
          lock.unlock();
      }    
    }
    
  public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();
    Thread producer = new Thread(new ProducerTask(lock));
    Thread consumer = new Thread(new ConsumerTask(lock));
    producer.start();
    consumer.start();
  }
}

This program produces the following output.

Producer requesting lock
Consumer requesting lock
Producer gets lock
Counting up
Counting up
Counting up
Counting up
Counting up
Producer releases lock
Consumer gets lock
Counting down
Counting down
Counting down
Counting down
Counting down
Consumer releases lock

Refer to Java Tutorials page for more tutorials.
Sunday, September 14, 2014

Atomic Operations Introduction

Traditional multi-threading approaches use locks to protect shared resources.  Synchronization objects like Semaphores provide mechanisms for the programmer to write code that doesn't modify a shared resource concurrently. The synchronization approaches block other threads when one of the thread is modifying a shared resource. Obviously blocked threads are not doing meaningful work waiting for the lock to be released.

Atomic operations on the contrast are based on non-blocking algorithms in which threads waiting for shared resources don't get postponed. Atomic operations are implemented using hardware primitives like compare and swap (CAS) which are atomic instructions used in multi-threading for synchronization.

Java Atomic Classes

Java supports atomic classes that support lock free, thread safe programming on single variables. These classes are defined in java.util.concurrent.atomic package. Some of the key classes include AtomicBoolean, AtomicInteger, AtomicLong, AtomicIntegerArray, AtomicLongArray and AtomicReference.

Some of the key APIs for AtomicInteger class include:
  • AtomicInteger() - Creates a new AtomicInteger that can be updated atomically. Initial value is set to 0.
  • AtomicInteger(int value) - Creates a new AtomicInteger with the specified initial value.
  • boolean compareAndSet(int expect, int value) - Atomically set the value if the current value is same as expected value.
  • int getAndSet(int newVal) - Atomically sets to new value and returns old value.
  • int getAndIncrement() - Atomically increments the current value and returns old value.
  • int incrementAndGet() - Atomically increments the current value and returns new value.
  • int getAndAdd(int delta) - Atomically adds the delta to current value and returns old value.
Refer to this link for more details.

Java AtomicInteger Example

In this example we have a producer and consumer acting up on a AtomicInteger counter to get lock free synchronization.

package com.sourcetricks.atomic;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicOpsExample {

  private static AtomicInteger count = new AtomicInteger(0);
  
  // Producer thread. Counts up the resource.
  private static class ProducerTask implements Runnable {

    @Override
      public void run() {
          for ( int i = 0; i < 10; i++ ) {           
            count.getAndIncrement();
            System.out.println("Counting up");   
          }                
      }     
  }

  // Consumer thread. Counts down the resource.
  private static class ConsumerTask implements Runnable {

    @Override
      public void run() {
          for ( int i = 0; i < 3; i++ ) {            
            count.getAndDecrement();
            System.out.println("Counting down");
          }                
      }     
  }
    
  public static void main(String[] args) {
    Thread producer = new Thread(new ProducerTask());
    Thread consumer = new Thread(new ConsumerTask());
    producer.start();
    consumer.start();   
  }
}

This program produces the following output.

Counting up
Counting up
Counting up
Counting up
Counting up
Counting down
Counting down
Counting down
Counting up
Counting up
Counting up
Counting up
Counting up

Refer to this link for more Java Tutorials.

Exchanger Introduction

Exchanger is a synchronization point in which threads can pair and exchange objects. Exchanger simplifies data exchange between two threads. Exchanger waits till two threads reach the synchronization point and then exchanges the data provided by the threads.

Java Exchanger Class

Java support Exchanger synchronization object. Exchanger is a generic class and support the following key APIs.
  • Exchanger() - Constructor to create the exchanger.
  • public V exchange(V x) - Waits for the partner thread to arrive at the exchange point and then swaps the objects

Java Exchanger Example

In this example, we create in the main() program an Exchanger instance for strings. This Exchanger object is then used to exchange strings between the two threads.

package com.sourcetricks.exchanger;

import java.util.concurrent.Exchanger;

public class ExchangerDemo {

  private static class MyTask1 implements Runnable {

    Exchanger<String> exchanger;
    
    MyTask1(Exchanger<String> exchanger) {
      this.exchanger = exchanger;
    }
    
    @Override
    public void run() {
      String str = null;
      try {
        str = exchanger.exchange("Hello from MyTask1");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("Received in MyTask1 = " + str);
    }
    
  }
  
  private static class MyTask2 implements Runnable {

    Exchanger<String> exchanger;
    
    MyTask2(Exchanger<String> exchanger) {
      this.exchanger = exchanger;
    }
    
    @Override
    public void run() {
      String str = null;
      try {
        str = exchanger.exchange("Hello from MyTask2");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("Received in MyTask2 = " + str);
    }
    
  }
  
  public static void main(String[] args) {
    
    Exchanger<String> exchanger = new Exchanger<String>();
    
    Thread t1 = new Thread(new MyTask1(exchanger));
    Thread t2 = new Thread(new MyTask2(exchanger));
    t1.start();
    t2.start();
  }
}

This program produces the following output.

Received in MyTask2 = Hello from MyTask1
Received in MyTask1 = Hello from MyTask2

Read other concurrency tutorials from Java Tutorials page.

Contact Form

Name

Email *

Message *

Back to Top