Read Azure IOT hub messages via service bus queue using Java

In this post, you will learn at the very top level how you can read messages on Azure IOT hub via the service bus queue using the Java program.
So before starting let’s see,

What is Azure IoT?

Azure IoT solutions are versatile and cover every aspect of IoT design and development, from connecting devices to delivering insights to decision-makers. Customers from different industries, including automotive, manufacturing, and energy, use this IoT platform to drive efficiency and intelligence to their operations.
As there are other cloud services that provide the IoT solution. We are going to learn the Azure IOT hub. There are multiple blogs available that have comparisons between Azure IOT and AWS IoT. You can find a few of them below.
Now getting started with I presume that you have basic knowledge about what Azure and it’s services are. And If you do not have any idea about it you can visit this link.
So the basic flow will be like the architecture above. The request will be made from the client side which will java program in our case then the request will be sent on the Azure service bus and will go to land on the queue and then the queue request will be sent to IoT device and the IoT device will respond accordingly.
So let’s start implementing now, login to Azure Portal and search for an IoT hub and create an IoT hub.
  • Now click on Add button
  • Select your subscription
  • Select resource group
  • Select Location
  • Enter your IoT hub name click next
  • Select Pricing & scale tier
  • Next, add tag i.e option
  • Review and create
So once you create your IoT hub, now you need to add an IoT device in it. For adding IoT devices go to the Explorers -> Select IoT device and add new Device. Once the IoT device is created now we need to configure Service Bus with IoT Hub.
Note: There are other ways to read and write a message on IoT Hub but the choice depends as per the requirement. In my case I want to read and write the messages continuously so here I am using the Service bus queue to read and write messages.
Now let’s create a service bus, search for service bus and click create/add
  • Select subscription
  • Select resource group
  • Add namespace name – This will be a service bus endpoint URL.
  • Select Location
  • Select Price tier
  • Add tags – optional and create
Once the service bus is created now we need to create a queue on the service bus.
Now open the created Service bus and scroll to the Entities section. In that select Queues and Click on add icon to create new.
  • Name the queue
  • Select queue size
  • Add Message time to live
  • Lock duration
  • And check the checkboxes as per your need and hit the create button
Once the queue is created now it’s time to configure the IoT hub with Service bus.
For this go to the created IoT hub and Look for the Events section, in that click on Event Subscription
  • Name the Event Subscription
  • Select Event Types
  • Now in Endpoint details
    • Select endpoint type to Service bus queue
    • Click on the select endpoint
    • Once you click on the select endpoint you’ll get a side menu bar where you need to configure your queue endpoint, you can check in the below image.
      • Select Subscription
      • Select Resource Group
      • Select Service Bus Namespace
      • Select Service Bus Queue
Now we are done with the setup part, we just need a few keys to get access from the Java program.
  • Endpoint URL: To get Endpoint, Go to Service Bus -> Scroll to the setting section -> In that Shared access policies (SAS) and create one. After creating the SAS key you’ll get the primary and secondary connection string. These connection strings will work as an endpoint URL. Just copy one of them for later. Check the below image for reference.
Now we are done with setup part, it’s time to get our hand dirty 😀
Create a java application, create the main method. And use the below code to read messages.
import static java.nio.charset.StandardCharsets.UTF_8;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;

public class SendAndReceiveMessage {

static final String CONNECTIONSTRING = <ENDPOINT_URL>;
static final Gson GSON = new Gson();
static final String QUEUE_NAME = <QUEUE_NAME>;

public void run(String connectionString) throws Exception {

// Create a QueueClient instance for receiving using the connection string
// builder
// We set the receive mode to "PeekLock", meaning the message is delivered
// under a lock and must be acknowledged ("completed") to be removed from the
// queue
QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, QUEUE_NAME),
ReceiveMode.PEEKLOCK);
// We are using single thread executor as we are only processing one message at
// a time
this.registerReceiver(receiveClient);

// wait for ENTER or 10 seconds elapsing
waitForEnter(10);

// shut down receiver to close the receive loop
receiveClient.close();
}


void registerReceiver(QueueClient queueClient) throws Exception {

// register the RegisterMessageHandler callback with executor service
queueClient.registerMessageHandler(new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
// receives message is passed to callback
if (message.getBody() != null) {

byte[] body = message.getBody();
Map details = GSON.fromJson(new String(body, UTF_8), Map.class);

System.out.println("Message received: "+details);
}
return CompletableFuture.completedFuture(null);
}

// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
});

}

public static void main(String[] args) {

System.exit(runApp(args, (connectionString) -> {
SendAndReceiveMessage app = new SendAndReceiveMessage();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}

public static int runApp(String[] args, Function<String, Integer> run) {
try {

String connectionString = CONNECTIONSTRING;

// parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}

// get overrides from the environment
String env = System.getenv(CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}

if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}

private void waitForEnter(int seconds) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
executor.invokeAny(Arrays.asList(() -> {
System.in.read();
return 0;
}, () -> {
Thread.sleep(seconds * 1000);
return 0;
}));
} catch (Exception e) {
// absorb
}
}
}
Now for the testing run the Java program and get to the IoT device which we had created earlier and click on Message to the device, in that add your message and click on send a message. Once you send a message from the IoT device you will receive that message in the Java program.
Hooray! We are done.

subscribe to our newsletter

   
   

About Author

deepak-kumbhar
Deepak Kumbhar

Deepak Kumbhar

Deepak is a Full-stack developer with around 3 years of experience. He is an expert in building Java integrated web applications, Creating REST APIs with well designed, testable and efficient and optimized code. He is having experience in web technologies like AngularJS, Angular6, JavaScript, etc. He is a Lead developer at Codegrip. He loves to solve technical problems and helping others.
Related Posts

Leave a Comment