File mqtt.cpp¶
Location: src/mqtt.cpp
Includes¶
graph LR
2["mqtt.h"]
click 2 "mqtt_8h.md#mqtt_8h"
2 --> 3
3["platform.h"]
click 3 "platform_8h.md#platform_8h"
3 --> 4
3 --> 5
3 --> 6
3 --> 7
3 --> 8
3 --> 9
3 --> 10
11["storage.h"]
click 11 "storage_8h.md#storage_8h"
11 --> 3
11 --> 12
1["src/mqtt.cpp"]
click 1 "mqtt_8cpp.md#mqtt_8cpp"
1 --> 2
1 --> 11
8["Adafruit_ADT7410.h"]
4["Arduino.h"]
9["ArduinoJson.h"]
10["ArduinoMqttClient.h"]
7["RTClib.h"]
6["SdFat.h"]
5["Wire.h"]
12["cstdio"]
Variables¶
Variable SMALL_BUFFER_SIZE¶
Definition: src/mqtt.cpp
(line 9)
Buffer size for small MQTT topics, payloads, and JSON documents.
Type: const size_t
Variable LARGE_BUFFER_SIZE¶
Definition: src/mqtt.cpp
(line 11)
Buffer size for large MQTT payloads and JSON documents (recovery data)
Type: const size_t
Variable FILE_NAME_BUFFER_SIZE¶
Definition: src/mqtt.cpp
(line 12)
Type: const size_t
Variable MAX_RECOVERY_FILES_PER_LOOP¶
Definition: src/mqtt.cpp
(line 13)
Type: const int
Variable FOLDER_NAME_BUFFER_SIZE¶
Definition: src/mqtt.cpp
(line 19)
Type: const size_t
Variable FULL_PATH_BUFFER_SIZE¶
Definition: src/mqtt.cpp
(line 20)
Type: const size_t
Variable LINE_BUFFER_SIZE¶
Definition: src/mqtt.cpp
(line 22)
Buffer size for reading individual CSV lines.
Type: const size_t
Variable SECONDS_IN_24_HOURS¶
Definition: src/mqtt.cpp
(line 23)
Type: const uint32_t
Variable RECOVERY_TIMEOUT_MS¶
Definition: src/mqtt.cpp
(line 25)
Timeout for recovery operations in milliseconds (60 seconds)
Type: const unsigned long
Variable ACK_TIMEOUT_MS¶
Definition: src/mqtt.cpp
(line 26)
Type: const unsigned long
Variable RECOVERY_ACK_TIMEOUT_MS¶
Definition: src/mqtt.cpp
(line 27)
Type: const unsigned long
Variable DELAY_POLLING_LOOP_MS¶
Definition: src/mqtt.cpp
(line 28)
Type: const unsigned long
Variable s_ackSeen¶
Definition: src/mqtt.cpp
(line 56)
Type: volatile bool
Variable s_ackSeq¶
Definition: src/mqtt.cpp
(line 57)
Type: volatile long
Variable s_pubTopic¶
Definition: src/mqtt.cpp
(line 58)
Type: String
Variable s_ackInit¶
Definition: src/mqtt.cpp
(line 59)
Type: bool
Functions¶
Function ExtractSequence¶
Extracts the sequence number from a JSON string.
Searches for the "sequence" field in the provided JSON and parses its value. Returns false if the field is missing or set to null.
Parameters:
- json: The JSON string to search
- outSeq: Reference to store the extracted sequence number
Returns:
true if a valid sequence number was found, false otherwise
Parameters:
- const char * json
- long & outSeq
Return type: bool
Function OnMqttEchoMessage¶
MQTT message callback to detect PUBACK/echo for published messages.
Processes incoming MQTT messages, filtering by MQTT_TOPIC and retain flag. If the message is an echo for the current publish MQTT_TOPIC and not retained, extracts the sequence number and sets acknowledgment flags.
Parameters:
- messageSize: Size of the incoming message (needed because of the MQTT library's callback interface)
Parameters:
- int messageSize
Return type: void
Function EnsureAckInit¶
static void EnsureAckInit(MqttClient &client, const char *topicPrefix, const char *sensorType, const char *sensorId)
Initializes ACK/Echo handling and subscribes to the publish MQTT_TOPIC.
Sets up the publish MQTT_TOPIC and registers the MQTT message callback for echo detection. Ensures the callback is registered only once, and re-subscribes to the MQTT_TOPIC after each reconnect.
Parameters:
- client: Reference to the MQTT client
- topicPrefix: Topic prefix for MQTT publishing
- sensorType: Sensor type string
- sensorId: Unique sensor identifier
Parameters:
- MqttClient & client
- const char * topicPrefix
- const char * sensorType
- const char * sensorId
Return type: void
Function CreateFullTopic¶
void CreateFullTopic(char *buffer, size_t bufferSize, const char *topicPrefix, const char *sensorType, const char *sensorId, const char *suffix)
Parameters:
- char * buffer
- size_t bufferSize
- const char * topicPrefix
- const char * sensorType
- const char * sensorId
- const char * suffix
Return type: void
Function SendTempToMqtt¶
bool SendTempToMqtt(MqttClient &mqttClient, const char *topicPrefix, const char *sensorType, const char *sensorId, float celsius, const DateTime &now, int sequence)
Publishes real-time sensor data to the MQTT broker with QoS 1 delivery.
This function builds a JSON payload from the provided sensor data and publishes it to the specified MQTT topic. After publishing, it waits briefly for a PUBACK handshake from the broker to confirm delivery. If no acknowledgment is received within the timeout window, the data is saved to a CSV file for later recovery.
Parameters:
- mqttClient: Reference to the MQTT client instance
- topicPrefix: Topic prefix for MQTT publishing (e.g., "dhbw/ai/si2023/2/")
- sensorType: Sensor type string (e.g., "temp")
- sensorId: Unique sensor identifier
- celsius: Measured temperature value in Celsius
- now: Current timestamp (DateTime)
- sequence: Sequence number for the measurement
Returns:
true if published and acknowledged by broker, false if fallback to CSV
?> Uses QoS 1 for reliable delivery. If broker does not echo/PUBACK within the timeout, data is persisted for later transmission.
Parameters:
- MqttClient & mqttClient
- const char * topicPrefix
- const char * sensorType
- const char * sensorId
- float celsius
- const DateTime & now
- int sequence
Return type: bool
Function SendPendingDataToMqtt¶
bool SendPendingDataToMqtt(MqttClient &mqttClient, const char *topicPrefix, const char *sensorType, const char *sensorId, const DateTime &now)
Processes and transmits pending CSV files from offline periods to the MQTT broker.
This function scans the SD card for CSV files containing unsent sensor data from previous offline periods. Each file is converted to a JSON payload and published to the MQTT topic
Parameters:
- mqttClient: Reference to the MQTT client instance
- topicPrefix: Topic prefix for MQTT publishing (e.g., "dhbw/ai/si2023/2/")
- sensorType: Sensor type string (e.g., "temp")
- sensorId: Unique sensor identifier
- now: Current timestamp (DateTime)
Returns:
true if all valid files were published and deleted, false if any files remain or errors occurred
?> Uses QoS 1 for reliable delivery. Skips files older than 24 hours or with invalid content. Aborts if recovery exceeds time limit.
Parameters:
- MqttClient & mqttClient
- const char * topicPrefix
- const char * sensorType
- const char * sensorId
- const DateTime & now
Return type: bool
Source¶
#include "mqtt.h"
#include "storage.h"
// =============================================================================
// BUFFER SIZE CONSTANTS
// =============================================================================
static const size_t SMALL_BUFFER_SIZE = 128;
static const size_t LARGE_BUFFER_SIZE = 2048;
static const size_t FILE_NAME_BUFFER_SIZE = 64;
static const int MAX_RECOVERY_FILES_PER_LOOP = 3;
// =============================================================================
// FILE SYSTEM AND TIMING CONSTANTS
// =============================================================================
static const size_t FOLDER_NAME_BUFFER_SIZE = 8;
static const size_t FULL_PATH_BUFFER_SIZE = 64;
static const size_t LINE_BUFFER_SIZE = 64;
static const uint32_t SECONDS_IN_24_HOURS = 86400;
static const unsigned long RECOVERY_TIMEOUT_MS = 60000;
static const unsigned long ACK_TIMEOUT_MS = 5000;
static const unsigned long RECOVERY_ACK_TIMEOUT_MS = 10000;
static const unsigned long DELAY_POLLING_LOOP_MS = 10;
// =============================================================================
// ACK/ECHO-HANDLING
// =============================================================================
static volatile bool s_ackSeen = false;
static volatile long s_ackSeq = -1;
static String s_pubTopic; // z. B. "<prefix>temp/Sensor_Two"
static bool s_ackInit = false;
static bool ExtractSequence(const char* json, long& outSeq) {
const char* p = strstr(json, "\"sequence\":");
if (!p) return false;
p += 11; // length of "\"sequence\":"
// Skip whitespace
while (*p == ' ' || *p == '\t') ++p;
// Support null (Recovery may have sequence:null)
if (strncmp(p, "null", 4) == 0) return false;
outSeq = strtol(p, nullptr, 10);
return true;
}
static void OnMqttEchoMessage(int messageSize) {
(void)messageSize;
if (mqttClient.messageTopic() != s_pubTopic) return;
if (mqttClient.messageRetain()) return;
static char buf[SMALL_BUFFER_SIZE * 2];
int n = 0;
while (mqttClient.available() && n < (int)sizeof(buf) - 1) {
buf[n++] = mqttClient.read();
}
buf[n] = 0;
long seq;
if (ExtractSequence(buf, seq)) {
s_ackSeq = seq;
s_ackSeen = true;
}
}
static void EnsureAckInit(MqttClient& client, const char* topicPrefix, const char* sensorType, const char* sensorId) {
if (!s_ackInit) {
char fullTopic[SMALL_BUFFER_SIZE];
if (sensorType && sensorId) {
snprintf(fullTopic, sizeof(fullTopic), "%s%s/%s", topicPrefix, sensorType, sensorId);
s_pubTopic = fullTopic;
s_ackInit = true;
} else {
return;
}
client.onMessage(OnMqttEchoMessage); // Callback register
}
if (client.connected()) {
client.subscribe(s_pubTopic.c_str());
}
}
// =============================================================================
void CreateFullTopic(char* buffer, size_t bufferSize, const char* topicPrefix,
const char* sensorType, const char* sensorId, const char* suffix) {
if (suffix && strlen(suffix) > 0) {
snprintf(buffer, bufferSize, "%s%s/%s/%s", topicPrefix, sensorType, sensorId, suffix);
} else {
snprintf(buffer, bufferSize, "%s%s/%s", topicPrefix, sensorType, sensorId);
}
}
// =============================================================================
// REAL-TIME DATA TRANSMISSION FUNCTIONS
// =============================================================================
bool SendTempToMqtt(MqttClient& mqttClient, const char* topicPrefix, const char* sensorType,
const char* sensorId, float celsius, const DateTime& now, int sequence) {
EnsureAckInit(mqttClient, topicPrefix, sensorType, sensorId);
mqttClient.poll();
char fullTopic[SMALL_BUFFER_SIZE];
CreateFullTopic(fullTopic, sizeof(fullTopic), topicPrefix, sensorType, sensorId);
StaticJsonDocument<SMALL_BUFFER_SIZE> jsonDoc;
BuildJson(jsonDoc, celsius, now, sequence);
char payload[SMALL_BUFFER_SIZE];
serializeJson(jsonDoc, payload, sizeof(payload));
// Reset ACK-Flags
s_ackSeen = false;
s_ackSeq = -1;
if (mqttClient.beginMessage(fullTopic, false, 1)) {
mqttClient.print(payload);
if (!mqttClient.endMessage()) {
Serial.println("MQTT endMessage() failed → saving to CSV.");
SaveTempToBatchCsv(now, celsius, sequence);
return false;
}
// delay for a short window to allow poll() to process the PUBACK/echo
unsigned long startTime = millis();
bool ackOk = false;
while (millis() - startTime < ACK_TIMEOUT_MS) {
mqttClient.poll();
if (s_ackSeen && s_ackSeq == sequence) {
ackOk = true;
break;
}
delay(DELAY_POLLING_LOOP_MS);
}
if (!ackOk) {
Serial.println("No Echo/PUBACK within timeout → saving to CSV.");
SaveTempToBatchCsv(now, celsius, sequence);
return false;
}
Serial.print("Published to ");
Serial.println(fullTopic);
Serial.println(payload);
return true;
} else {
Serial.println("MQTT beginMessage() failed → saving to CSV.");
SaveTempToBatchCsv(now, celsius, sequence);
return false;
}
}
// =============================================================================
// DATA RECOVERY AND OFFLINE TRANSMISSION FUNCTIONS
// =============================================================================
bool SendPendingDataToMqtt(MqttClient& mqttClient, const char* topicPrefix, const char* sensorType,
const char* sensorId, const DateTime& now) {
Serial.println("Looking for pending CSV files...");
// Track processing time to prevent infinite loops
const unsigned long startMillis = millis();
bool allFilesSent = true;
// Open the current date folder
char folder[FOLDER_NAME_BUFFER_SIZE];
strncpy(folder, CreateFolderName(now), sizeof(folder));
File root = sd.open(folder);
if (!root) {
Serial.println("No folder found for pending data.");
return true;
}
// Initialize processing counters
int sentCount = 0;
int checkedFiles = 0;
int skippedEmptyFiles = 0;
File entry;
while ((entry = root.openNextFile())) {
if (entry.isDirectory()) continue;
char filename[FILE_NAME_BUFFER_SIZE];
entry.getName(filename, sizeof(filename));
entry.close();
String nameStr(filename);
if (!nameStr.endsWith(".csv")) continue;
checkedFiles++;
// Validate file age (skip files older than 24 hours)
char fullPath[FULL_PATH_BUFFER_SIZE];
snprintf(fullPath, sizeof(fullPath), "%s/%s", folder, filename);
File tsFile = sd.open(fullPath, FILE_READ);
if (tsFile) {
char line[LINE_BUFFER_SIZE];
if (tsFile.fgets(line, sizeof(line)) > 0) {
char* p = strtok(line, ",");
if (p) {
Serial.print("Malformed CSV line (no timestamp): ");
Serial.println(line);
uint32_t ts = atol(p);
if (now.unixtime() - ts > SECONDS_IN_24_HOURS) {
Serial.print("Skipping old CSV file (>24h): ");
Serial.println(nameStr);
tsFile.close();
continue;
}
}
}
tsFile.close();
}
// Convert CSV content to JSON format
StaticJsonDocument<LARGE_BUFFER_SIZE> doc;
BuildRecoveryJsonFromBatchCsv(doc, fullPath, now);
// Validate that the file contains usable data
if (!doc["meta"].is<JsonObject>() || doc["meta"].size() == 0) {
Serial.println("No valid data in: " + nameStr);
skippedEmptyFiles++;
continue;
}
// Serialize JSON and check payload size
char payload[LARGE_BUFFER_SIZE];
size_t len = serializeJson(doc, payload, sizeof(payload));
if (len >= sizeof(payload)) {
Serial.println("Payload too large, skipping file: " + nameStr);
allFilesSent = false;
continue;
}
char fullTopic[SMALL_BUFFER_SIZE];
CreateFullTopic(fullTopic, sizeof(fullTopic), topicPrefix, sensorType, sensorId, "recovered");
Serial.print("Publishing recovered CSV: ");
Serial.println(nameStr);
Serial.print("MQTT payload: ");
Serial.println(payload);
bool published = false;
if (mqttClient.beginMessage(fullTopic, false, 1)) {
mqttClient.print(payload);
if (mqttClient.endMessage()) {
// wait for echo/PUBACK handshake
unsigned long startTime = millis();
while (millis() - startTime < RECOVERY_ACK_TIMEOUT_MS) {
mqttClient.poll();
delay(DELAY_POLLING_LOOP_MS);
}
published = true;
}
}
if (published) {
Serial.println("Published and deleting file.");
DeleteCsvFile(fullPath);
sentCount++;
} else {
Serial.println("Failed to publish. Keeping file: " + nameStr);
allFilesSent = false;
}
// Check for overall timeout to prevent blocking too long
if (millis() - startMillis > RECOVERY_TIMEOUT_MS) {
Serial.println("Aborting recovery: 60s time limit exceeded.");
allFilesSent = false;
break;
}
}
root.close();
// Provide summary of recovery operation
if (checkedFiles == 0) {
Serial.println("No CSV recovery files found.");
} else if (sentCount == 0 && skippedEmptyFiles == checkedFiles) {
Serial.println("All found recovery files were empty, too old, or invalid.");
} else {
Serial.print("Recovered files sent this loop: ");
Serial.println(sentCount);
}
return allFilesSent;
}