Technology Sharing

Building a multi-protocol IoT cloud platform based on EMQX Flask InfluxDB Grafana: MQTT/HTTP device access and data visualization process (with code examples)

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Summary:This article introduces the core concepts of IoT, cloud platform, MQTT, HTTP, data visualization, etc., and combines mainstream tools such as EMQX, Flask, InfluxDB, Grafana, etc. to teach you how to build an IoT cloud platform that supports multiple protocols. The article has a clear structure, rich pictures and texts, and detailed and easy-to-understand codes, aiming to help readers quickly master the core technologies of building an IoT cloud platform.

Key words:IoT, cloud platform, MQTT, HTTP, data visualization, EMQX, Flask, InfluxDB, Grafana


1. Basic knowledge of IoT

1.1 Overview of the Internet of Things

The Internet of Things (IoT) refers to the use of various information sensors, radio frequency identification technology, global positioning systems, etc. to collect real-time data on any object or process that needs to be monitored, connected, and interacted with, thereby achieving ubiquitous connection between objects and objects, and objects and people, and further realizing intelligent perception, identification, and management of objects and processes.

1.2 IoT Architecture

The IoT system architecture is usually divided into three layers:

  • Perception layer:Responsible for collecting data, including various sensors, RFID tags, GPS modules, etc.
  • Network layer:Responsible for data transmission, including various network protocols, communication technologies, such as WiFi, Bluetooth, Zigbee, NB-IoT, etc.
  • Application layer:Responsible for data processing and application presentation, such as data analysis, remote control, intelligent decision-making, etc.

2. Cloud Platform and Data Visualization

2.1 Cloud Platform

Cloud platform refers to the increase, use and delivery model of Internet-based related services, which usually involves providing dynamic, scalable and often virtualized resources through the Internet. Cloud platform can provide powerful computing, storage and network resources for IoT applications, reducing the cost of IoT application development and deployment.

2.2 Data Visualization

Data visualization refers to the display of data in visual forms such as graphics and charts to help users understand the data more intuitively and gain insight into the patterns and trends behind the data.

3. Introduction to Common Protocols and Tools

3.1 MQTT Protocol

MQTT (Message Queuing Telemetry Transport) is a lightweight message publish/subscribe protocol designed for low-bandwidth, low-power devices and networks. MQTT is widely used in the field of Internet of Things, especially for resource-constrained devices and unreliable network environments.

3.2 HTTP Protocol

HTTP (Hypertext Transfer Protocol) is an application layer protocol used to transfer information between web browsers and web servers. The HTTP protocol is simple and easy to use and is widely used in various network applications, including the field of Internet of Things.

3.3 EMQX

EMQX is an open source, high-performance, and scalable MQTT message server that supports millions of concurrent connections and message throughput.

3.4 Flask

Flask is a lightweight web application framework written in Python. It is easy to learn and use and suitable for quickly building web applications and API interfaces.

3.5 InfluxDB

InfluxDB is an open source time series database designed for storing and querying time series data. It is suitable for storing IoT sensor data, monitoring data, etc.

3.6 Grafana

Grafana is an open source data visualization tool that can connect to multiple data sources to create beautiful and powerful dashboards to display data in real time.

4. Construction of Multi-protocol IoT Cloud Platform

This project will build an IoT cloud platform that supports MQTT and HTTP protocols to achieve the following functions:

  • Multi-protocol support: Supports device access via both MQTT and HTTP protocols.
  • Data collection and storage: Collect data from different protocol devices in real time and store them in the InfluxDB database.
  • data visualization: Use Grafana to visualize the collected data.
4.1 System Architecture

 

4.2 Code Implementation

1. HTTP server setup (Flask)

  1. # 导入 Flask 库
  2. from flask import Flask, request, jsonify
  3. # 创建 Flask 应用
  4. app = Flask(__name__)
  5. # 定义 HTTP 接口,接收 POST 请求
  6. @app.route('/data', methods=['POST'])
  7. def receive_data():
  8. # 获取请求数据
  9. data = request.get_json()
  10. # 数据处理逻辑,例如数据校验、格式转换等
  11. # ...
  12. # 将数据写入 InfluxDB (示例)
  13. from influxdb import InfluxDBClient
  14. client = InfluxDBClient('localhost', 8086, 'user', 'password', 'iot_data')
  15. json_body = [
  16. {
  17. "measurement": "sensor_data",
  18. "tags": {
  19. "sensor_id": data.get("sensor_id")
  20. },
  21. "fields": {
  22. "temperature": data.get("temperature"),
  23. "humidity": data.get("humidity")
  24. }
  25. }
  26. ]
  27. client.write_points(json_body)
  28. # 返回响应
  29. return jsonify({'message': 'Data received successfully!'}), 200
  30. # 启动 Flask 应用
  31. if __name__ == '__main__':
  32. app.run(debug=True)

Code Description:

  • useFlaskThe framework creates an HTTP server and defines/dataThe interface receives POST requests.
  • userequest.get_json()Get the JSON data in the HTTP request.
  • Perform data processing, such as data verification, format conversion, etc.
  • useInfluxDBClientConnect to the InfluxDB database and write data to the database.
  • Returns a response in JSON format to inform the client that the data was received successfully.

2. MQTT message processing (Python)

  1. # 导入必要的库
  2. import paho.mqtt.client as mqtt
  3. from influxdb import InfluxDBClient
  4. import json
  5. # MQTT Broker 配置
  6. MQTT_BROKER = "localhost"
  7. MQTT_PORT = 1883
  8. MQTT_TOPIC = "sensor/data"
  9. # InfluxDB 配置
  10. INFLUXDB_HOST = "localhost"
  11. INFLUXDB_PORT = 8086
  12. INFLUXDB_USER = "user"
  13. INFLUXDB_PASSWORD = "password"
  14. INFLUXDB_DATABASE = "iot_data"
  15. # 创建 InfluxDB 客户端
  16. influxdb_client = InfluxDBClient(
  17. host=INFLUXDB_HOST,
  18. port=INFLUXDB_PORT,
  19. username=INFLUXDB_USER,
  20. password=INFLUXDB_PASSWORD,
  21. database=INFLUXDB_DATABASE
  22. )
  23. # 连接到 MQTT Broker
  24. def on_connect(client, userdata, flags, rc):
  25. print("Connected to MQTT Broker with result code " + str(rc))
  26. client.subscribe(MQTT_TOPIC)
  27. # 接收 MQTT 消息
  28. def on_message(client, userdata, msg):
  29. # 解析数据
  30. data = json.loads(msg.payload.decode())
  31. # 构建 InfluxDB 数据点
  32. influxdb_data = [
  33. {
  34. "measurement": "sensor_data",
  35. "tags": {
  36. "sensor_id": data.get("sensor_id"),
  37. },
  38. "fields": {
  39. "temperature": data.get("temperature"),
  40. "humidity": data.get("humidity"),
  41. }
  42. }
  43. ]
  44. # 写入 InfluxDB
  45. influxdb_client.write_points(influxdb_data)
  46. print("Data written to InfluxDB: " + str(influxdb_data))
  47. # 创建 MQTT 客户端
  48. mqtt_client = mqtt.Client()
  49. mqtt_client.on_connect = on_connect
  50. mqtt_client.on_message = on_message
  51. mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
  52. # 启动 MQTT 客户端
  53. mqtt_client.loop_start()
  54. # 保持程序运行
  55. while True:
  56. pass

Code Description:

  • usepaho.mqtt.clientConnect to the MQTT Broker and subscribe to the specified topic.
  • When an MQTT message is received, usejson.loads()Parse the message content.
  • Construct the parsed data into InfluxDB data point format.
  • useinfluxdb_client.write_points()Write data to the InfluxDB database.

3. Data Visualization (Grafana)

  • Install Grafana and configure the data source to connect to the InfluxDB database.
  • Create a dashboard and add charts such as line charts, bar charts, etc. to the dashboard.
  • Configure the data source of the chart to be InfluxDB, and write a query statement to obtain data from InfluxDB.
  • Configure the chart's style, title, axis and other properties as needed to make data presentation more intuitive and easy to understand.

Example Grafana query:

SELECT "temperature", "humidity" FROM "sensor_data" WHERE time > now() - 1h

The query will besensor_dataQuery the temperature and humidity data of the last hour in measurement.

This project combines tools such as Flask, EMQX, InfluxDB and Grafana to build an IoT cloud platform that supports MQTT and HTTP protocols, and realizes data collection, storage and visualization. The platform can be flexibly expanded to support more types of devices and protocol access, and can be customized according to actual needs.

Notice:

  • The above code is for reference only and needs to be modified according to specific needs in actual applications.
  • Please make sure that all dependent libraries are installed, such aspaho-mqttinfluxdbflaskwait.
  • In actual deployment, factors such as data security and system stability need to be considered.

If you want specific codes and ideas, you can send me a private message! ! !