21  MQTT

21.1 Intro

In today’s session, we will be diving into the fascinating world of MQTT and its relevance in the field of agriculture. MQTT, which stands for Message Queuing Telemetry Transport, is a lightweight and efficient messaging protocol designed for constrained devices and low-bandwidth networks.

In the realm of AgroTech, where the Internet of Things (IoT) plays a crucial role, MQTT offers a powerful solution for connecting sensors, actuators, and other devices in agricultural systems. It provides a reliable and scalable means of communication, enabling seamless data exchange between devices, networks, and applications.

One of the primary advantages of MQTT is its lightweight nature, making it ideal for resource-constrained devices such as the ESP32 microcontroller. With its small code footprint and minimal network overhead, MQTT facilitates efficient data transmission even in remote or challenging agricultural environments.

Through the use of MQTT, farmers and agriculturalists can gather real-time information about various aspects of their operations, including temperature, humidity, soil moisture, and crop health. This data can then be transmitted to centralized servers or cloud platforms, allowing for intelligent analysis, decision-making, and automation in the agricultural domain.

Furthermore, MQTT follows a publish-subscribe messaging pattern, enabling a scalable and flexible communication model. Devices can publish data to specific topics, and other devices or applications can subscribe to these topics to receive relevant information. This decoupled architecture ensures that data is delivered only to those who need it, minimizing network congestion and optimizing system performance.

In this AgroTech class, we will explore the implementation of MQTT on the ESP32 microcontroller, enabling you to build innovative and efficient solutions for the agricultural sector. So, let’s dive in and discover how MQTT can revolutionize the way we approach farming, enhance productivity, and contribute to sustainable agriculture practices.

21.2 Fundamental Concepts

  1. MQTT Broker:
    At the core of MQTT is the broker, which acts as a central hub for message exchange. The broker receives messages published by clients and distributes them to the interested subscribers. It is responsible for routing messages based on topics and maintaining the overall communication flow. For our calss today we will use the Hivemq free public broker.

  2. MQTT Client:
    Clients are devices or applications that connect to the MQTT broker. They can be sensors, actuators, microcontrollers, or any device capable of sending or receiving messages. MQTT clients can function as both publishers and subscribers, allowing them to send data and receive updates from the broker. For our class we can utilize the Hivemq online client for testing.

  3. Topic:
    Topics are hierarchical strings that act as labels for messages. They serve as a way to categorize and organize information. When publishing a message, clients assign it to a specific topic. Subscribers can then choose to receive messages from specific topics of interest. For example, in agriculture, topics could represent sensor data such as “temperature,” “humidity,” or “soil moisture.”

  4. Publish:
    Publishing refers to the action of sending a message from an MQTT client to the broker. When a client publishes a message, it specifies the topic to which the message belongs. The broker then distributes the message to all the interested subscribers that have subscribed to that particular topic.

  5. Subscribe:
    Subscribing allows clients to express their interest in receiving messages from specific topics. By subscribing to a topic, a client informs the broker that it wants to receive messages published to that topic. Subscriptions can be specific to a particular topic (e.g., temperature) or can use wildcard characters to subscribe to multiple topics (e.g., agriculture/+/moisture to subscribe to all moisture data in different areas of agriculture).

21.3 A bit more about topics

Topics in MQTT are written as hierarchical strings, consisting of one or more levels separated by forward slashes (/). Each level within a topic provides a level of categorization and organization for the messages.

For example, consider the topic structure in agriculture:

  • agriculture/temperature: This topic represents the temperature data in the agricultural context. It consists of a single level, agriculture followed by another level, temperature.

  • agriculture/field1/humidity: This topic represents the humidity data in a specific field. It consists of three levels: agriculture, field1, and humidity.

  • agriculture/+/soil moisture: This topic uses a wildcard character, +, which can match exactly one topic level. Subscribing to this topic would allow receiving messages related to soil moisture from various areas within agriculture.

  • agriculture/#: This topic uses the wildcard character #, which can match multiple levels or no level at all. Subscribing to this topic would enable receiving messages from any sub-topic under agriculture.

The use of forward slashes and levels in topic design allows for a structured approach to organizing data in MQTT. It enables subscribers to select specific levels or use wildcard characters to filter and receive messages of interest.

When publishing a message, the client specifies the topic to which the message belongs, ensuring that it is appropriately categorized and can be received by subscribers who are interested in that specific topic.

Understanding how topics are written and structured is crucial for effective message routing and data organization within MQTT-based AgroTech applications.

21.4 LED subscribe

Download code
//addapted from:
//https://microcontrollerslab.com/esp32-mqtt-client-publish-subscribe-bme280-readings-hivemq/

#include <Arduino.h>
#include <WiFi.h>
#include <PubSubClient.h>

WiFiClient wifiClient;
PubSubClient mqttClient(wifiClient); 

const char* ssid = "wifi_name";
const char* password = "wifi_password";

char *mqttServer = "broker.hivemq.com";
int mqttPort = 1883;

int LED_pin = A4;
String msgString; // Declare msgString variable in the global scope

void setupMQTT() {
  mqttClient.setServer(mqttServer, mqttPort);
  mqttClient.setCallback(callback);
}

void reconnect() {
  Serial.println("Connecting to MQTT Broker...");
  while (!mqttClient.connected()) {
      Serial.println("Reconnecting to MQTT Broker..");
      String clientId = "ESP32Client-";
      clientId += String(random(0xffff), HEX);
      
      if (mqttClient.connect(clientId.c_str())) {
        Serial.println("Connected.");
        // subscribe to topic
        mqttClient.subscribe("agrotech/mqtt_tutorial/LED");
      }      
  }
}

void setup() {
  Serial.begin(115200);
  WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
      delay(500);
      Serial.print(".");
    } 
    Serial.println("");
     Serial.println("Connected to Wi-Fi");

  setupMQTT();

  pinMode(LED_pin, OUTPUT);
  digitalWrite(LED_pin, LOW); 
  
}


void loop() {
  if (!mqttClient.connected()) {
      reconnect();
  }
    
  mqttClient.loop();
  long now = millis();
}



void callback(char* topic, byte* message, unsigned int length) {
  // Convert byte array to string
  msgString = String((char*)message, length);

  // Convert string to integer
  int ledState = msgString.toInt();

  Serial.print("Message: ");
  Serial.println(msgString);
  Serial.print("LED State: ");
  Serial.println(ledState);

  if (ledState == 1) {
    digitalWrite(LED_pin, HIGH);  // Set pin A4 high
  } else if (ledState == 0) {
    digitalWrite(LED_pin, LOW);   // Set pin A4 low
  }
}

21.5 Publish text

Download code
#include <Arduino.h>
#include <WiFi.h>
#include <PubSubClient.h>

WiFiClient wifiClient;
PubSubClient mqttClient(wifiClient); 

const char* ssid = "wifi_name";
const char* password = "wifi_password";

const char* mqttServer = "broker.hivemq.com";
int mqttPort = 1883;

long previous_time = 0;
const char* messageToSend = "This is my fuuny message"; // Message as a variable

void setupMQTT() {
  mqttClient.setServer(mqttServer, mqttPort);
}

void reconnect() {
  Serial.println("Connecting to MQTT Broker...");
  while (!mqttClient.connected()) {
    Serial.println("Reconnecting to MQTT Broker..");
    String clientId = "ESP32Client-";
    clientId += String(random(0xffff), HEX);
    
    if (mqttClient.connect(clientId.c_str())) {
      Serial.println("Connected.");
    }      
  }
}

void setup() {
  Serial.begin(115200);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  } 
  Serial.println("");
  Serial.println("Connected to Wi-Fi");

  setupMQTT();
}

void loop() {
  if (!mqttClient.connected())
    reconnect();
  mqttClient.loop();
  long now = millis();

  if (now - previous_time > 5000) {
    previous_time = now;
    mqttClient.publish("agrotech/mqtt_tutorial/message", messageToSend);
  }
}

21.6 Sensor subscribe

Download code
//addapted from:
//https://microcontrollerslab.com/esp32-mqtt-client-publish-subscribe-bme280-readings-hivemq/

#include <Arduino.h>
#include <WiFi.h>
#include <PubSubClient.h>

WiFiClient wifiClient;
PubSubClient mqttClient(wifiClient); 

const char* ssid = "wifi_name";
const char* password = "wifi_password";

char *mqttServer = "broker.hivemq.com";
int mqttPort = 1883;

float value = 0;
String msgString; // Declare msgString variable in the global scope

void setupMQTT() {
  mqttClient.setServer(mqttServer, mqttPort);
  mqttClient.setCallback(callback);
}

void reconnect() {
  Serial.println("Connecting to MQTT Broker...");
  while (!mqttClient.connected()) {
      Serial.println("Reconnecting to MQTT Broker..");
      String clientId = "ESP32Client-";
      clientId += String(random(0xffff), HEX);
      
      if (mqttClient.connect(clientId.c_str())) {
        Serial.println("Connected.");
        // subscribe to topic
        mqttClient.subscribe("agrotech/mqtt_tutorial/temperature");
      }      
  }
}

void setup() {
  Serial.begin(115200);
  WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
      delay(500);
      Serial.print(".");
    } 
    Serial.println("");
     Serial.println("Connected to Wi-Fi");

  setupMQTT();

}


void loop() {
  if (!mqttClient.connected()) {
      reconnect();
  }
    
  mqttClient.loop();
  long now = millis();
}



void callback(char* topic, byte* message, unsigned int length) {
  // Convert byte array to string
  msgString = String((char*)message, length);

  // Convert string to float
  value = msgString.toFloat();

  Serial.println(value);

}

21.7 Publish and subscribe

Download code
//addapted from:
//https://microcontrollerslab.com/esp32-mqtt-client-publish-subscribe-bme280-readings-hivemq/

#include <Arduino.h>
#include <WiFi.h>
#include <PubSubClient.h>

WiFiClient wifiClient;
PubSubClient mqttClient(wifiClient); 

const char* ssid = "TP-Link_905D";
const char* password = "33072036";

char *mqttServer = "broker.hivemq.com";
int mqttPort = 1883;

long previous_time = 0;
String msgString; // Declare msgString variable in the global scope

void setupMQTT() {
  mqttClient.setServer(mqttServer, mqttPort);
  mqttClient.setCallback(callback);
}

void reconnect() {
  Serial.println("Connecting to MQTT Broker...");
  while (!mqttClient.connected()) {
      Serial.println("Reconnecting to MQTT Broker..");
      String clientId = "ESP32Client-";
      clientId += String(random(0xffff), HEX);
      
      if (mqttClient.connect(clientId.c_str())) {
        Serial.println("Connected.");
        // subscribe to topic
        mqttClient.subscribe("agrotech/2023/message");
      }      
  }
}

void setup() {
  Serial.begin(115200);
  WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
      delay(500);
      Serial.print(".");
    } 
    Serial.println("");
     Serial.println("Connected to Wi-Fi");

  setupMQTT();
}


void loop() {
  if (!mqttClient.connected())
    reconnect();
  mqttClient.loop();
  long now = millis();

  if (now - previous_time > 1000) {
    previous_time = now;

    float temperature = random(1, 101);
    char tempString[8];
    dtostrf(temperature, 1, 2, tempString);
    Serial.print("Temperature: ");
    Serial.println(tempString);
    mqttClient.publish("agrotech/2023/temperature", tempString);

    mqttClient.publish("agrotech/2023/string", "Hi :)");

//    Serial.print("Subscribed Message: ");
//    Serial.println(msgString);
  }
}


void callback(char* topic, byte* message, unsigned int length) {
  // Convert byte array to string
 //  String msgString((char*)message, length);

  msgString = String((char*)message, length);
  
  // Convert string to float
  float msgFloat = msgString.toFloat();

  // Convert string to integer
  int msgInt = msgString.toInt();
  
  Serial.print("Message: ");
  Serial.println(msgString);
  Serial.print("Float: ");
  Serial.println(msgFloat);
  Serial.print("Integer: ");
  Serial.println(msgInt);
}

21.8 MQTT + Python

21.9 Subscribe

Download code
import paho.mqtt.client as mqtt

# MQTT broker details
broker_address = "broker.hivemq.com"
broker_port = 1883

# Callback function when a connection is established with the MQTT broker
def on_connect(client, userdata, flags, rc):
    print("Connected to MQTT broker with result code: " + str(rc))
    # Subscribe to the topic upon successful connection
    client.subscribe("agrotech/mqtt_tutorial/#")

# Callback function when a message is received
def on_message(client, userdata, msg):
    print("Received message: " + str(msg.payload.decode()))

# Create a MQTT client instance
client = mqtt.Client()

# Assign callback functions
client.on_connect = on_connect
client.on_message = on_message

# Connect to the MQTT broker
client.connect(broker_address, broker_port, 60)

# Loop to maintain the connection and process network traffic
client.loop_forever()

21.10 Publish

Download code
import paho.mqtt.client as mqtt
import random
import time

# MQTT broker details
broker_address = "broker.hivemq.com"
broker_port = 1883

# Create a MQTT client instance
client = mqtt.Client()

# Connect to the MQTT broker
client.connect(broker_address, broker_port, 60)

temperature = 20.0
# Loop to publish random temperature readings
while True:
    temperature = temperature + random.uniform(-1, 1)  # Generate a random temperature value between 20.0 and 30.0
    client.publish("agrotech/mqtt_tutorial/temperature", str(temperature))  # Publish the temperature value to the topic
    print("Published temperature: " + str(temperature))
    time.sleep(1)  # Wait for 5 seconds before publishing the next temperature reading

# Disconnect from the MQTT broker (not reached in this example)
client.disconnect()

21.11 Subscribe to CSV

Download code
import paho.mqtt.client as mqtt
import pandas as pd
from datetime import datetime

# MQTT broker details
broker_address = "broker.hivemq.com"
broker_port = 1883

# Create a DataFrame to store the messages
df = pd.DataFrame(columns=["topic", "value"])

# Callback function when a connection is established with the MQTT broker
def on_connect(client, userdata, flags, rc):
    print("Connected to MQTT broker with result code: " + str(rc))
    # Subscribe to all subtopics within agrotech/mqtt_tutorial/
    client.subscribe("agrotech/mqtt_tutorial/#")

# Callback function when a message is received
def on_message(client, userdata, msg):
    global df
    # Decode the message payload
    value = msg.payload.decode()
    # Extract the subtopic
    subtopic = msg.topic.split("agrotech/mqtt_tutorial/")[1]
    # Get the current time
    timestamp = datetime.now()
    # Create a new DataFrame with the new message
    new_row = pd.DataFrame({"topic": [subtopic], "value": [value]}, index=[timestamp])
    # Append the new row to the DataFrame using concat
    df = pd.concat([df, new_row])
    print(f"Received message on {msg.topic}: {value}")

# Create an MQTT client instance
client = mqtt.Client()

# Assign callback functions
client.on_connect = on_connect
client.on_message = on_message

# Connect to the MQTT broker
client.connect(broker_address, broker_port, 60)

# Loop to maintain the connection and process network traffic
try:
    client.loop_forever()
except KeyboardInterrupt:
    # Save the DataFrame to a CSV file when the script is interrupted
    df.to_csv("mqtt_messages.csv")
    print("Data saved to mqtt_messages.csv")

21.12 Control irrigation solenoid in the greenhouse

Use the code below to control the irrigation solenoid outside the greenhouse.

Code usage: connect a jumper to A4. Whenever you touch the jumper, the solenoid will be open, otherwise it will be closed.

Download code
#include <WiFi.h>
#include <PubSubClient.h>

// WiFi credentials
const char* ssid = "agrotech";          // Replace with your WiFi SSID
const char* password = "1Afuna2gezer";  // Replace with your WiFi Password

// MQTT broker details
const char* mqtt_server = "192.168.0.102";    // IP address of Home Assistant
const int mqtt_port = 1883;                   // Default MQTT port
const char* mqtt_user = "mqtt-user";          // MQTT username
const char* mqtt_password = "1234";           // MQTT password
const char* mqtt_topic = "/greenhouse/outside/irrigation/solenoid5"; // MQTT topic

// Touch pin configuration
#define TOUCH_PIN A4  // Use GPIO 4 as touch pin (T0 on ESP32)

// MQTT client and WiFi client
WiFiClient espClient;
PubSubClient client(espClient);

// State variable to track touch pin state
bool send_message = false;

void setup_wifi() {
  Serial.print("Connecting to WiFi: ");
  Serial.println(ssid);
  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  Serial.println("\nWiFi connected");
  Serial.print("IP address: ");
  Serial.println(WiFi.localIP());
}

void reconnect() {
  // Attempt to reconnect to the MQTT broker
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    if (client.connect("ESP32Client", mqtt_user, mqtt_password)) {
      Serial.println("connected");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" Retrying in 5 seconds...");
      delay(5000);
    }
  }
}

void setup() {
  Serial.begin(115200);

  // Initialize WiFi
  setup_wifi();

  // Set up MQTT
  client.setServer(mqtt_server, mqtt_port);

  // Ensure the ESP32 starts connected to the MQTT broker
  reconnect();
}

void loop() {
  if (!client.connected()) {
    reconnect();
  }
  client.loop();

  // Read the state of the touch pin
  bool is_touched = touchRead(TOUCH_PIN) < 40;  // If the value is less than 40, the pin is being touched

  // Send "1" if the pin is being touched
  if (is_touched) {
    if (!send_message) {
      client.publish(mqtt_topic, "1");
      Serial.println("Sent: 1");
      send_message = true;  // Set flag to avoid sending multiple times while touching
    }
  } 
  // Send "0" if the pin is not being touched
  else {
    if (send_message) {
      client.publish(mqtt_topic, "0");
      Serial.println("Sent: 0");
      send_message = false;  // Reset the flag when pin is no longer touched
    }
  }

  // Optional: Small delay to avoid flooding the MQTT server with messages
  delay(100);  // Adjust the delay if necessary
}