Skip to content

File mqtt.cpp

Location: src/mqtt.cpp

Includes

graph LR
2["mqtt.h"]
click 2 "mqtt_8h.md#mqtt_8h"
2 --> 3

3["platform.h"]
click 3 "platform_8h.md#platform_8h"
3 --> 4
3 --> 5
3 --> 6
3 --> 7
3 --> 8
3 --> 9
3 --> 10

11["storage.h"]
click 11 "storage_8h.md#storage_8h"
11 --> 3
11 --> 12

1["src/mqtt.cpp"]
click 1 "mqtt_8cpp.md#mqtt_8cpp"
1 --> 2
1 --> 11

8["Adafruit_ADT7410.h"]

4["Arduino.h"]

9["ArduinoJson.h"]

10["ArduinoMqttClient.h"]

7["RTClib.h"]

6["SdFat.h"]

5["Wire.h"]

12["cstdio"]

Variables

Variable SMALL_BUFFER_SIZE


Definition: src/mqtt.cpp (line 9)

const size_t SMALL_BUFFER_SIZE = 128

Buffer size for small MQTT topics, payloads, and JSON documents.

Type: const size_t

Variable LARGE_BUFFER_SIZE


Definition: src/mqtt.cpp (line 11)

const size_t LARGE_BUFFER_SIZE = 2048

Buffer size for large MQTT payloads and JSON documents (recovery data)

Type: const size_t

Variable FILE_NAME_BUFFER_SIZE


Definition: src/mqtt.cpp (line 12)

const size_t FILE_NAME_BUFFER_SIZE = 64

Type: const size_t

Variable MAX_RECOVERY_FILES_PER_LOOP


Definition: src/mqtt.cpp (line 13)

const int MAX_RECOVERY_FILES_PER_LOOP = 3

Type: const int

Variable FOLDER_NAME_BUFFER_SIZE


Definition: src/mqtt.cpp (line 19)

const size_t FOLDER_NAME_BUFFER_SIZE = 8

Type: const size_t

Variable FULL_PATH_BUFFER_SIZE


Definition: src/mqtt.cpp (line 20)

const size_t FULL_PATH_BUFFER_SIZE = 64

Type: const size_t

Variable LINE_BUFFER_SIZE


Definition: src/mqtt.cpp (line 22)

const size_t LINE_BUFFER_SIZE = 64

Buffer size for reading individual CSV lines.

Type: const size_t

Variable SECONDS_IN_24_HOURS


Definition: src/mqtt.cpp (line 23)

const uint32_t SECONDS_IN_24_HOURS = 86400

Type: const uint32_t

Variable RECOVERY_TIMEOUT_MS


Definition: src/mqtt.cpp (line 25)

const unsigned long RECOVERY_TIMEOUT_MS = 60000

Timeout for recovery operations in milliseconds (60 seconds)

Type: const unsigned long

Variable ACK_TIMEOUT_MS


Definition: src/mqtt.cpp (line 26)

const unsigned long ACK_TIMEOUT_MS = 5000

Type: const unsigned long

Variable RECOVERY_ACK_TIMEOUT_MS


Definition: src/mqtt.cpp (line 27)

const unsigned long RECOVERY_ACK_TIMEOUT_MS = 10000

Type: const unsigned long

Variable DELAY_POLLING_LOOP_MS


Definition: src/mqtt.cpp (line 28)

const unsigned long DELAY_POLLING_LOOP_MS = 10

Type: const unsigned long

Variable s_ackSeen


Definition: src/mqtt.cpp (line 56)

volatile bool s_ackSeen = false

Type: volatile bool

Variable s_ackSeq


Definition: src/mqtt.cpp (line 57)

volatile long s_ackSeq = -1

Type: volatile long

Variable s_pubTopic


Definition: src/mqtt.cpp (line 58)

String s_pubTopic

Type: String

Variable s_ackInit


Definition: src/mqtt.cpp (line 59)

bool s_ackInit = false

Type: bool

Functions

Function ExtractSequence


static bool ExtractSequence(const char *json, long &outSeq)

Extracts the sequence number from a JSON string.

Searches for the "sequence" field in the provided JSON and parses its value. Returns false if the field is missing or set to null.

Parameters:

  • json: The JSON string to search
  • outSeq: Reference to store the extracted sequence number

Returns:

true if a valid sequence number was found, false otherwise

Parameters:

  • const char * json
  • long & outSeq

Return type: bool

Function OnMqttEchoMessage


static void OnMqttEchoMessage(int messageSize)

MQTT message callback to detect PUBACK/echo for published messages.

Processes incoming MQTT messages, filtering by MQTT_TOPIC and retain flag. If the message is an echo for the current publish MQTT_TOPIC and not retained, extracts the sequence number and sets acknowledgment flags.

Parameters:

  • messageSize: Size of the incoming message (needed because of the MQTT library's callback interface)

Parameters:

  • int messageSize

Return type: void

Function EnsureAckInit


static void EnsureAckInit(MqttClient &client, const char *topicPrefix, const char *sensorType, const char *sensorId)

Initializes ACK/Echo handling and subscribes to the publish MQTT_TOPIC.

Sets up the publish MQTT_TOPIC and registers the MQTT message callback for echo detection. Ensures the callback is registered only once, and re-subscribes to the MQTT_TOPIC after each reconnect.

Parameters:

  • client: Reference to the MQTT client
  • topicPrefix: Topic prefix for MQTT publishing
  • sensorType: Sensor type string
  • sensorId: Unique sensor identifier

Parameters:

  • MqttClient & client
  • const char * topicPrefix
  • const char * sensorType
  • const char * sensorId

Return type: void

Function CreateFullTopic

void CreateFullTopic(char *buffer, size_t bufferSize, const char *topicPrefix, const char *sensorType, const char *sensorId, const char *suffix)

Parameters:

  • char * buffer
  • size_t bufferSize
  • const char * topicPrefix
  • const char * sensorType
  • const char * sensorId
  • const char * suffix

Return type: void

Function SendTempToMqtt

bool SendTempToMqtt(MqttClient &mqttClient, const char *topicPrefix, const char *sensorType, const char *sensorId, float celsius, const DateTime &now, int sequence)

Publishes real-time sensor data to the MQTT broker with QoS 1 delivery.

This function builds a JSON payload from the provided sensor data and publishes it to the specified MQTT topic. After publishing, it waits briefly for a PUBACK handshake from the broker to confirm delivery. If no acknowledgment is received within the timeout window, the data is saved to a CSV file for later recovery.

Parameters:

  • mqttClient: Reference to the MQTT client instance
  • topicPrefix: Topic prefix for MQTT publishing (e.g., "dhbw/ai/si2023/2/")
  • sensorType: Sensor type string (e.g., "temp")
  • sensorId: Unique sensor identifier
  • celsius: Measured temperature value in Celsius
  • now: Current timestamp (DateTime)
  • sequence: Sequence number for the measurement

Returns:

true if published and acknowledged by broker, false if fallback to CSV

?> Uses QoS 1 for reliable delivery. If broker does not echo/PUBACK within the timeout, data is persisted for later transmission.

Parameters:

  • MqttClient & mqttClient
  • const char * topicPrefix
  • const char * sensorType
  • const char * sensorId
  • float celsius
  • const DateTime & now
  • int sequence

Return type: bool

Function SendPendingDataToMqtt

bool SendPendingDataToMqtt(MqttClient &mqttClient, const char *topicPrefix, const char *sensorType, const char *sensorId, const DateTime &now)

Processes and transmits pending CSV files from offline periods to the MQTT broker.

This function scans the SD card for CSV files containing unsent sensor data from previous offline periods. Each file is converted to a JSON payload and published to the MQTT topic /recovered with QoS 1. After publishing, it waits briefly for a PUBACK handshake from the broker to confirm delivery. If the PUBACK is not received within the timeout period, the file is saved for later transmission. Files are only deleted if the publish operation succeeds. Files older than 24 hours or with invalid data are skipped.

Parameters:

  • mqttClient: Reference to the MQTT client instance
  • topicPrefix: Topic prefix for MQTT publishing (e.g., "dhbw/ai/si2023/2/")
  • sensorType: Sensor type string (e.g., "temp")
  • sensorId: Unique sensor identifier
  • now: Current timestamp (DateTime)

Returns:

true if all valid files were published and deleted, false if any files remain or errors occurred

?> Uses QoS 1 for reliable delivery. Skips files older than 24 hours or with invalid content. Aborts if recovery exceeds time limit.

Parameters:

  • MqttClient & mqttClient
  • const char * topicPrefix
  • const char * sensorType
  • const char * sensorId
  • const DateTime & now

Return type: bool

Source

#include "mqtt.h"
#include "storage.h"

// =============================================================================
// BUFFER SIZE CONSTANTS
// =============================================================================

static const size_t SMALL_BUFFER_SIZE = 128;
static const size_t LARGE_BUFFER_SIZE = 2048;
static const size_t FILE_NAME_BUFFER_SIZE = 64;
static const int MAX_RECOVERY_FILES_PER_LOOP = 3;

// =============================================================================
// FILE SYSTEM AND TIMING CONSTANTS
// =============================================================================

static const size_t FOLDER_NAME_BUFFER_SIZE = 8;
static const size_t FULL_PATH_BUFFER_SIZE = 64;
static const size_t LINE_BUFFER_SIZE = 64;
static const uint32_t SECONDS_IN_24_HOURS = 86400;
static const unsigned long RECOVERY_TIMEOUT_MS = 60000;
static const unsigned long ACK_TIMEOUT_MS = 5000;
static const unsigned long RECOVERY_ACK_TIMEOUT_MS  = 10000;
static const unsigned long DELAY_POLLING_LOOP_MS = 10;

// =============================================================================
// ACK/ECHO-HANDLING 
// =============================================================================

static volatile bool  s_ackSeen   = false;
static volatile long  s_ackSeq    = -1;
static String         s_pubTopic;           // z. B. "<prefix>temp/Sensor_Two"
static bool           s_ackInit   = false;

static bool ExtractSequence(const char* json, long& outSeq) {
  const char* p = strstr(json, "\"sequence\":");
  if (!p) return false;
  p += 11; // length of "\"sequence\":"
  // Skip whitespace
  while (*p == ' ' || *p == '\t') ++p;
  // Support null (Recovery may have sequence:null)
  if (strncmp(p, "null", 4) == 0) return false;
  outSeq = strtol(p, nullptr, 10);
  return true;
}

static void OnMqttEchoMessage(int messageSize) {
  (void)messageSize;
  if (mqttClient.messageTopic() != s_pubTopic) return;
  if (mqttClient.messageRetain()) return;

  static char buf[SMALL_BUFFER_SIZE * 2];
  int n = 0;
  while (mqttClient.available() && n < (int)sizeof(buf) - 1) {
    buf[n++] = mqttClient.read();
  }
  buf[n] = 0;

  long seq;
  if (ExtractSequence(buf, seq)) {
    s_ackSeq  = seq;
    s_ackSeen = true;
  }
}

static void EnsureAckInit(MqttClient& client, const char* topicPrefix, const char* sensorType, const char* sensorId) {
  if (!s_ackInit) {
    char fullTopic[SMALL_BUFFER_SIZE];
    if (sensorType && sensorId) {
      snprintf(fullTopic, sizeof(fullTopic), "%s%s/%s", topicPrefix, sensorType, sensorId);
      s_pubTopic = fullTopic;
      s_ackInit  = true;
    } else {
      return; 
    }
    client.onMessage(OnMqttEchoMessage); // Callback register
  }

  if (client.connected()) {
    client.subscribe(s_pubTopic.c_str());
  }
}

// =============================================================================

void CreateFullTopic(char* buffer, size_t bufferSize, const char* topicPrefix,
                     const char* sensorType, const char* sensorId, const char* suffix) {
  if (suffix && strlen(suffix) > 0) {
    snprintf(buffer, bufferSize, "%s%s/%s/%s", topicPrefix, sensorType, sensorId, suffix);
  } else {
    snprintf(buffer, bufferSize, "%s%s/%s", topicPrefix, sensorType, sensorId);
  }
}

// =============================================================================
// REAL-TIME DATA TRANSMISSION FUNCTIONS
// =============================================================================

bool SendTempToMqtt(MqttClient& mqttClient, const char* topicPrefix, const char* sensorType,
                const char* sensorId, float celsius, const DateTime& now, int sequence) {
  EnsureAckInit(mqttClient, topicPrefix, sensorType, sensorId);

  mqttClient.poll();

  char fullTopic[SMALL_BUFFER_SIZE];
  CreateFullTopic(fullTopic, sizeof(fullTopic), topicPrefix, sensorType, sensorId);

  StaticJsonDocument<SMALL_BUFFER_SIZE> jsonDoc;
  BuildJson(jsonDoc, celsius, now, sequence);

  char payload[SMALL_BUFFER_SIZE];
  serializeJson(jsonDoc, payload, sizeof(payload));

  // Reset ACK-Flags
  s_ackSeen = false;
  s_ackSeq  = -1;


  if (mqttClient.beginMessage(fullTopic, false, 1)) {
    mqttClient.print(payload);
    if (!mqttClient.endMessage()) {
      Serial.println("MQTT endMessage() failed → saving to CSV.");
      SaveTempToBatchCsv(now, celsius, sequence);
      return false;
    }

    // delay for a short window to allow poll() to process the PUBACK/echo
    unsigned long startTime = millis();
    bool ackOk = false;
    while (millis() - startTime < ACK_TIMEOUT_MS) {
      mqttClient.poll();
      if (s_ackSeen && s_ackSeq == sequence) {
        ackOk = true;
        break;
      }
      delay(DELAY_POLLING_LOOP_MS);
    }

    if (!ackOk) {
      Serial.println("No Echo/PUBACK within timeout → saving to CSV.");
      SaveTempToBatchCsv(now, celsius, sequence);
      return false;
    }

    Serial.print("Published to ");
    Serial.println(fullTopic);
    Serial.println(payload);
    return true;
  } else {
    Serial.println("MQTT beginMessage() failed → saving to CSV.");
    SaveTempToBatchCsv(now, celsius, sequence);
    return false;
  }
}

// =============================================================================
// DATA RECOVERY AND OFFLINE TRANSMISSION FUNCTIONS
// =============================================================================

bool SendPendingDataToMqtt(MqttClient& mqttClient, const char* topicPrefix, const char* sensorType,
                     const char* sensorId, const DateTime& now) {
  Serial.println("Looking for pending CSV files...");

  // Track processing time to prevent infinite loops
  const unsigned long startMillis = millis();
  bool allFilesSent = true;

  // Open the current date folder
  char folder[FOLDER_NAME_BUFFER_SIZE];
  strncpy(folder, CreateFolderName(now), sizeof(folder));
  File root = sd.open(folder);
  if (!root) {
    Serial.println("No folder found for pending data.");
    return true;
  }

  // Initialize processing counters
  int sentCount = 0;
  int checkedFiles = 0;
  int skippedEmptyFiles = 0;

  File entry;
  while ((entry = root.openNextFile())) {
    if (entry.isDirectory()) continue;

    char filename[FILE_NAME_BUFFER_SIZE];
    entry.getName(filename, sizeof(filename));
    entry.close();

    String nameStr(filename);
    if (!nameStr.endsWith(".csv")) continue;

    checkedFiles++;

    // Validate file age (skip files older than 24 hours)
    char fullPath[FULL_PATH_BUFFER_SIZE];
    snprintf(fullPath, sizeof(fullPath), "%s/%s", folder, filename);
    File tsFile = sd.open(fullPath, FILE_READ);
    if (tsFile) {
      char line[LINE_BUFFER_SIZE];
      if (tsFile.fgets(line, sizeof(line)) > 0) {
        char* p = strtok(line, ",");
        if (p) {
          Serial.print("Malformed CSV line (no timestamp): ");
          Serial.println(line);
          uint32_t ts = atol(p);
          if (now.unixtime() - ts > SECONDS_IN_24_HOURS) {
            Serial.print("Skipping old CSV file (>24h): ");
            Serial.println(nameStr);
            tsFile.close();
            continue;
          }
        }
      }
      tsFile.close();
    }

    // Convert CSV content to JSON format
    StaticJsonDocument<LARGE_BUFFER_SIZE> doc;
    BuildRecoveryJsonFromBatchCsv(doc, fullPath, now);

    // Validate that the file contains usable data
    if (!doc["meta"].is<JsonObject>() || doc["meta"].size() == 0) {
      Serial.println("No valid data in: " + nameStr);
      skippedEmptyFiles++;
      continue;
    }

    // Serialize JSON and check payload size
    char payload[LARGE_BUFFER_SIZE];
    size_t len = serializeJson(doc, payload, sizeof(payload));
    if (len >= sizeof(payload)) {
      Serial.println("Payload too large, skipping file: " + nameStr);
      allFilesSent = false;
      continue;
    }

    char fullTopic[SMALL_BUFFER_SIZE];
    CreateFullTopic(fullTopic, sizeof(fullTopic), topicPrefix, sensorType, sensorId, "recovered");

    Serial.print("Publishing recovered CSV: ");
    Serial.println(nameStr);
    Serial.print("MQTT payload: ");
    Serial.println(payload);

    bool published = false;
    if (mqttClient.beginMessage(fullTopic, false, 1)) {
      mqttClient.print(payload);
      if (mqttClient.endMessage()) {
        // wait for echo/PUBACK handshake
        unsigned long startTime = millis();
        while (millis() - startTime < RECOVERY_ACK_TIMEOUT_MS) {
          mqttClient.poll();
          delay(DELAY_POLLING_LOOP_MS);
        }
        published = true;
      }
    }

    if (published) {
      Serial.println("Published and deleting file.");
      DeleteCsvFile(fullPath);
      sentCount++;
    } else {
      Serial.println("Failed to publish. Keeping file: " + nameStr);
      allFilesSent = false;
    }

    // Check for overall timeout to prevent blocking too long
    if (millis() - startMillis > RECOVERY_TIMEOUT_MS) {
      Serial.println("Aborting recovery: 60s time limit exceeded.");
      allFilesSent = false;
      break;
    }
  }

  root.close();

  // Provide summary of recovery operation
  if (checkedFiles == 0) {
    Serial.println("No CSV recovery files found.");
  } else if (sentCount == 0 && skippedEmptyFiles == checkedFiles) {
    Serial.println("All found recovery files were empty, too old, or invalid.");
  } else {
    Serial.print("Recovered files sent this loop: ");
    Serial.println(sentCount);
  }

  return allFilesSent;
}