Microsoft has launched a preview for Azure IoT hub a while back. I didn't have the time back then
to check it out, but now that it has had some time to settle I think it's a good time to check things out.

This time however I'm not going to try the .NET API which is a first class language if you're working with
Azure. Instead I'm going to see what it does when you try to use it from the Scala/Java perspective.

What is Azure IoT hub and what does it offer you?

A typical IoT solution these days is made up of a couple of devices connected to some sort of control
centre in a cloud.

Another property of such a solution is that the devices in a typical IoT solution will generate events
that need to be handled by the central control centre in order to get value out of these events.

Depending on what you do, these events can generate quite a big stream of data. Take for example appliances
like the fitbit. There are a lot of customers that have this device and each of these devices generates
quite a few events for things like heartrate, steps etc.

How are you going to handle this type of load?

I don't know what fitbit is using, but you'd typically need something that scales beyond a single machine.
It also needs to be async so that you can handle a huge load on a single machine to keep things cheap.

Microsoft offers a solution that does just that. Azure IoT Hub is an IoT oriented message hub that allows
you to exchange messages between IoT devices and services using a number of protocols.
Right now it supports HTTPS, MQTT and AMQPS. All of them secured using so-called Shared Access Keys. This
means that the solution is secured, so you know that the measurements are coming from a trusted location.

Get the software

To write a device application that connects to the Azure IoT hub you need the Azure IoT hub SDK for java.
The SDK is hosted on Github. As of the moment of writing there
isn't a build available through maven so you have to make one yourself.

I used the following commands to get a build

git clone https://github.com/Azure/azure-iot-sdks
cd azure-iot-sdks/java/device/iothub-java-client
mvn compile install -Dmaven.test.skip=true

After you compile and install the azure-iot-sdk locally in your maven repository you need to modify
the build file for your Scala program to include the dependency:

resolvers += Resolver.mavenLocal

libraryDependencies ++= Seq(
    "com.microsoft.azure.iothub-java-client" % "iothub-java-client" % "1.0.0-preview.8"
)

Keep in mind the dependency can only be found in the local maven repository. So add the local
maven repository to the resolvers in the build file. Without this you will get a build error.

Setting things up

Before you can actually talk to the IoT hub on Azure you need to set one up. Thinglabs has
an excellent guide to get you started.

Once you have set up a new IoT hub in Azure and registered a device, save the Device Identifier and
Device Key somewhere, you're going to need this in the application.

Programming against the IoT Hub

Azure IoT hub, as I mentioned in the beginning of this post, has a good API for .NET. It offers a Nuget
package that you can download and use from C#. But how does it do in Java/Scala?

The foundations look pretty solid. Even from Scala the API is pretty easy to use. Setting up a client
is pretty straightforward:

val connectionString = "HostName=[host];DeviceId=device01;SharedAccessKey=[key]"
val connector = new DeviceClient(connectionString,IotHubClientProtocol.HTTPS)

It is important to keep in mind here is the hostname of your IoT hub instance in Azure and the device key
which you need to enter as the SharedAccessKey. Please make sure that you enter a valid Base64 key.
I ran into problems when trying things out, because I had an invalid key. It wasn't until I manually
tried to decode the key that I discovered I actually had a copy-paste problem.

The documentation is quite limited for the Java software, so this sort of thing is quite easy to get wrong.

After I fixed my key problem it was quite easy to get messages delivered to the IoT hub. I wrote
an actor using ReactivePI and connected it through a IotHubConnector actor.

class EventHubConnector(connectionString: String, deviceIdentifier: String) extends Actor
with ActorLogging with IotHubProtocol {

  import EventHubConnector._

  val connector = new DeviceClient(connectionString,IotHubClientProtocol.HTTPS)

  val eventCallback = new IotHubEventCallback {
    override def execute(iotHubStatusCode: IotHubStatusCode, o: scala.Any) =  {
      // Make sure that the status is correct.
      // Authorization errors should be raised as fatal as there's no way to recover from this scenario
      // Everything else, retry and see if that fixes the problem.
      iotHubStatusCode match {
        case IotHubStatusCode.OK | IotHubStatusCode.OK_EMPTY => log.info("Delivered measurement")
        case IotHubStatusCode.UNAUTHORIZED => throw new FatalHubException("Device key or identifier is invalid")
        case _ => throw new HubException(s"Request failed: $iotHubStatusCode")
      }
    }
  }

  def receive = {
    // Deliver incoming measurements to the IoT hub in azure.
    case event:Measurement => sendEvent(event)
  }

  private def sendEvent(measurement: Measurement) = {
    import spray.json._

    val msg = new Message(measurement.toJson.toString())

    connector.sendEventAsync(msg,eventCallback,null)
  }

  override def postStop() = {
    connector.close()
  }


  override def preStart() = {
    connector.open()
  }

  override def preRestart(reason: Throwable, message: Option[Any]) = {
    // Reprocess the message when the original issue was caused by a delivery failure.
    // If something else caused the problem, mark the message as unhandled and continue.
    (reason,message) match {
      case (_: HubException,Some(msg)) => self.tell(msg,sender)
      case (_:Exception,Some(msg)) => unhandled(msg)
      case (_,_) => // Do nothing in this case, nothing to be processed.
    }
  }

  // Automatically restart for a maximum of 3 attempts when the delivery of a message failed.
  // If something fatal happens, stop the process entirely and escalate the error.
  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
    case ex: FatalHubException =>
      log.error(ex,"Failed to deliver event due to fatal error")
      Escalate
    case _ =>
      log.error("Failed to deliver event due to temporary problem with connector")
      Restart
  }
}

The code in the sample is quite a bit to take in at once. So let's break it down bit by bit.
The connector itself is an actor, this means it receives messages. Right now it receives
only one message, a measurement. When it receives the measurement it will try to post it to
the Azure IoT hub.

def receive = {
  // Deliver incoming measurements to the IoT hub in azure.
  case event:Measurement => sendEvent(event)
}

private def sendEvent(measurement: Measurement) = {
  import spray.json._

  val msg = new Message(measurement.toJson.toString())

  connector.sendEventAsync(msg,eventCallback,null)
}

To post a new measurement to the IoT hub the connector needs an actual connection.
Connecting to an IoT hub is done by creating a new instance of the DeviceClient class.

val connector = new DeviceClient(connectionString,IotHubClientProtocol.HTTPS)

The DeviceClient class supports multiple kinds of transports. Namely AMQPS, HTTPS and MQTT.
The first one is actually a queueing protocol implemented by quite a few products such as RabbitMQ
and Azure EventBus. HTTPS is the good old secure HTTP protocol that we know and love.
MQTT is also a queueing protocol, but mostly used in IoT devices that don't have a lot of processing power.

I have tried all of them, but found that AMQPS and MQTT don't seem to work right now. MQTT gives a null reference error. The AMQPS seems to be unable to connect. So if you plan on using the preview use the HTTP
transport for the device client.

One of the important things to get right in a connector like the one I built is that it is capable
of retrying failed deliveries. You don't want to miss events due to connection problems.

Luckily it's not that hard to build a retry mechanism in akka, so I extended the actor
with a retry policy.

// Automatically restart for a maximum of 3 attempts when the delivery of a message failed.
// If something fatal happens, stop the process entirely and escalate the error.
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
  case ex: FatalHubException =>
    log.error(ex,"Failed to deliver event due to fatal error")
    Escalate
  case _ =>
    log.error("Failed to deliver event due to temporary problem with connector")
    Restart
}

The actor will generate a HubException when a delivery fails or a FatalHubException when
something is really wrong with the actor. In case of a regular HubException we try to restart
the actor. In the case of a FatalHubException we stop the actor since there's no way the problem
will resolve itself.

Upon restart we want to retry the message. You can make the actor reprocess a message by overriding
the preRestart method of the actor. In this method you get the original error that caused the restart
and the original message that wasn't processed.

override def preRestart(reason: Throwable, message: Option[Any]) = {
  // Reprocess the message when the original issue was caused by a delivery failure.
  // If something else caused the problem, mark the message as unhandled and continue.
  (reason,message) match {
    case (_: HubException,Some(msg)) => self.tell(msg,sender)
    case (_:Exception,Some(msg)) => unhandled(msg)
    case (_,_) => // Do nothing in this case, nothing to be processed.
  }
}

I used pattern matching to find out what happened and how to handle the situation.

In case we get a HubException (the delivery failed) and we got a message to process, repost the
message to the actor using the original sender.

In case we get an exception but have a message, we don't repost the message as its clearly problematic
to process the message. Instead we mark it as unhandled. Unhandled message get posted to the deadletter queue
in akka. If you want to handle this situation you need to write some logic to receive messages from
the deadletter queue and alert an admin or something.

In all other cases we don't do anything and accept the fact that there's something unknown and
there's no message we need to process in the actor.

The only thing left to do after implementing the IoT hub connector is to send some measurements to it.
For that I modified the
temperature-sensor
project a bit so that it posts the temperature measurements to the connector.

connector ! IotHubConnector.Measurement("temperature", currentTemperature)

The connector itself is initialized using the following piece of code:

val hubConnector = system.actorOf(EventHubConnector.props(connectionString, "device03"), "hub-connector")

I like to give my actors a proper name so that I can easily debug things if something doesn't go the way
I want it to go.

Final thoughts

Talking to Azure IoT hub from Scala right now is a bit difficult to set up, but I didn't expect anything
else from a technical preview. I think with a few more pull requests and a bit more help from Microsoft
it could actually be quite useful in combination with libraries like ReactivePI.

It's definitely worth a shot if you ask me.