This didn’t sound right to me and a lot of questions were arising in my mind,
- There are so many open source and commercial brokers available, doesn’t any of them allow saving data to a database?
- So many IOT platforms including AWS, Google & IBM support MQTT, How are they able to store the incoming MQTT messages?
The quest for a free MQTT broker which allows to save messages to a database :
This lead me into researching more into the open source MQTT brokers like Mosquitto, But even the most popular Mosquitto broker was also not supporting any customization to save data to a database, which is the most essential next step of concentrating the received data from IOT devices.
There i came across this awesome Open Source MQTT Broker called EMQ (Erlang MQTT Broker) defined as below on their site,
EMQ (Erlang MQTT Broker) is a distributed, massively scalable, highly extensible MQTT message broker written in Erlang/OTP. EMQ is fully open source and licensed under the Apache Version 2.0. EMQ implements both MQTT V3.1 and V3.1.1 protocol specifications, and supports MQTT-SN, CoAP, WebSocket, STOMP and SockJS at the same time.
This broker is open source, and the code is hosted on GitHub. They also have enterprise version available (Everyone needs money to survive 🙂 )
First thing what i caught my attention is the word ERLANG,
What is ERLANG ?
ERLANG is a programming language used for creating concurrent applications which need to be scaled massively and need high availaiblility. The banking software’s and ecommerce software’s are written in this.
For me, its weird and cryptic and doesn’t look similar to any programming language i have known.
How is EMQ different?
But good thing about this EMQ broker is that it allows to write PLUGINS, These plugins can be used to tap the incoming MQTT messages as well as outgoing messages using the HOOKS. Hooks are some functions which are called when certain event occurs like,
Hook | Description |
---|---|
client.connected | Run when client connected to the broker successfully |
client.subscribe | Run before client subscribes topics |
client.unsubscribe | Run when client unsubscribes topics |
session.subscribed | Run After client(session) subscribed a topic |
session.unsubscribed | Run After client(session) unsubscribed a topic |
message.publish | Run when a MQTT message is published |
message.delivered | Run when a MQTT message is delivered |
message.acked | Run when a MQTT message is acked |
client.disconnected | Run when client disconnected from broker |
These are the possible hooks we can use and our code gets called whenever these events occur, For example to save the incoming message to a MySQL database what i did is, I used the REST API endpoint created by using Dreamfactory (Open source REST backend) and posted the incoming MQTT messages to this endpoint as shown below,
on_message_publish(Message, _Env) -> io:format("publish ~s~n", [emqttd_message:format(Message)]), MessageBin = element(12, Message), MessageStr = binary_to_list(MessageBin), inets:start(), Method = post, URL = "http://127.0.0.1/api/v2/mysql/_table/log", Header = [{"X-DreamFactory-Api-Key", "7d1a4baac442ed875b1545af24f9f4312a1cfdb3fad0db096db91cf2869d17f2"}], Type = "application/json", Body = MessageStr, HTTPOptions = [], Options = [], R = httpc:request(Method, {URL, Header, Type, Body}, HTTPOptions, Options), {ok, Message}.
As you can see above the code is written in ERLANG and i have tapped the hook called on_message_publish() which gets called whenever a device/client publishes a message to the broker, So, I am posting all the messages which are coming into the broker to the database using HTTP REST API.
Here in this example, I am posting a JSON string which has values of latitude, longitude, time, Device ID
{“resource”:[{“lat”:”344″,”lon”:”123″,”time”:”2018-02-19 11:00:18″,”devid”:”Hello”}]}
Data saved on MySQL database fetched and viewed using Dreamfactory REST API :
This whole string comes in a MQTT message and gets posted to the database as shown below,
So, In this way i was able to bridge the gap between publishing messages to my MQTT broker to saving the messages to a MySQL database. I chose to do a REST API based post to database because ERLANG is very new to me and i don’t know how to directly post messages to MySQL database. I got help from good people at StackOverflow especially 7Stud and AlexL to implement this. Here is the link on the discussion.
Below is a demo video on how to install these two Open Source Softwares, EMQ and Dreamfactory side by side on a AWS server and link them to save MQTT messages to a database
Demo Video :