Dieses Skript kommuniziert ausschließlich per MQTT. Es sammelt solange Messwerte in einem Datenfeld, bis per MQTT eine Sendeanfrage eintrifft. Dann werden diese Werte gesendet und eine neue Messfolge begonnen.

Inhalt

Datenstruktur

Die MQTT Payload beinhaltet folgende Informationen.

start: Startzeit (Datum, Uhrzeit) des Messbeginns
first: Zeit (Datum, Uhrzeit) zum ersten Messwert
last: Zeit (Datum, Uhrzeit) zum letzten Messwert
ts: Zeitstempel (timestamp) zum ersten Messwert
dt: zeitlicher Abstand zwischen zwei benachbarten Messwerten
values: alle gesammelten Werte in einem Datenfeld

Der erforderliche Schedule Job (Zeitplan) zur regelmäßigen Messwertspeicherung kann bspw. per https://tools.eichelsdoerfer.net/schedjob.html angelegt werden.

Das Skript

/* created by Gerhard Eichelsdörfer, D-55543 Bad Kreuznach  2023-11-28, V 1.0 */

let debug = true;

let Topic = 'ort/autark';
let TopicValues = Topic + '/values';
let TopicGetData = Topic + '/getdata';
let DS = '-'; // date separator
let DTS = 'T'; // separator between date and time
let TS = ':'; // time separator
let MaxVals = 4608; // 16 days, one measurement every 5 minutes or 32 days every 10 minutes
// I don't know if enough memory is available for MaxVals values.

let Temp = {
  n: 0, // number of single values
  val: 0 // average of single values between two storages in Data
};

let Data = {
  start: null, // time at script start
  first: null, // time at first measurement
  last: null, // time at last measurement
  ts: null, // timestamp at first measurement
  dt: null, // time difference between two nearest measurements
  values: [] // array containing all measurements
};

function twoDig(n) {
  n = JSON.stringify(n);
  return n<10 ? '0' + n : n;
}

function genDate(){
  let d = new Date();
  return {
    "t":d.getTime(),
    "d": d.getFullYear() + DS
    + twoDig(d.getMonth() + 1) + DS
    + twoDig(d.getDate()) + DTS
    + twoDig(d.getHours()) + TS
    + twoDig(d.getMinutes()) + TS
    + twoDig(d.getSeconds())};
}

function send() {
  if(Data.values.length===0) return; //do nothing
  let p = JSON.stringify(Data);
  Data.start = genDate().d;
  Data.values = [];
  if(debug) print(p);
  if(MQTT.isConnected()) MQTT.publish(TopicValues, p);
}

function store(){
  if(Temp.n===0 || Data.values.length>=MaxVals) return; // do nothing
  let d = genDate();
  if(Data.values.length===0) {
    Data.first = d.d;
    Data.ts = d.t;
  }
  Data.last = d.d;
  Data.values.push(Math.round(10*Temp.val)/10);
  Temp.n = 0; // start with a new average
  if(debug) print("stored next value in Data:", JSON.stringify(Data));
}

function myEventHandler(event) { // event ist die strukturierte Nachricht
 //print(JSON.stringify(event));
 if(event.component==='temperature:101'){
   let T = event.info.tC; // + Temp.n for testing on static temperature
   Temp.val = (Temp.n * Temp.val + T) / ++(Temp.n); // build the stepwise average
   if(debug) print(Temp.n, ': ', T, ' -> ', Temp.val);
 }
}

MQTT.subscribe(TopicGetData, send); // request to send collected data

Shelly.addEventHandler(myEventHandler);

Data.start = genDate().d;

Shelly.call("Schedule.List", {},
  function (result) {
    //print(JSON.stringify(result.jobs));
    let s = result.jobs;
    let l = s.length;
    if(l>0){
      let id = Shelly.getCurrentScriptId();
      let i = 0;
      for(; i<l && 
        (s[i].calls[0].method!=="script.eval"
          || s[i].calls[0].params.id!==id
          || s[i].calls[0].params.code!=="store()");
        ++i);
      if(i>=l) return; // no job found
      let t = s[i].timespec.split(' ');
      t = t[1].split('/');
      if(t.length!==2) return;
      Data.dt = JSON.parse(t[1]) * 60;
      if(debug) print("found in schedule job dt value:", Data.dt, "s");  
    }
  }
);


Wie arbeitet das Skript?

  1. Bilden eines arithmetischen Mittelwertes
    Es bildet per Eventhandler die eintreffenden Temperaturwerte zwischen zwei lokalen Speicherungen (s.u.) den arithmetischen Mittelwert. Aus wievielen Einzelwerten dieser Mittelwert gebildet wird, hängt vom Abstand zwischen zwei lokalen Speicherungen ab. Die Einzelwerte werden dem Eventhandler in nicht sehr regelmäßigen Abständen zugeführt. Grob liegen diese Abstände oftmals bei ca. 1 Minute. Wenn die Werte alle 5 Minuten lokal gespeichert werden, ergibt sich also der Mittelwert aus ca. 5 Einzelwerten.

  2. Lokale Speicherung eines Mittelwertes
    Ein Zeitplan (Schedule Job), der für den Bedarf passend einzurichten ist, bspw. per https://tools.eichelsdoerfer.net/schedjob.html, ruft zu vom Anwender festgelegten Zeitpunkten bzw. Abständen periodisch genau die Skript interne Funktion store() auf, welche den gebildeten Mittelwert (s.o.) lokal an ein Datenfeld (Array) anfügt. Zu Beginn einer (neuen) Messfolge ist dieses Datenfeld leer. Das Datenfeld liegt in einer Struktur "Data", bestehend aus
    1. start: Datum und Uhrzeit des Messfolgebeginns, menschlich lesbar
    2. first: Datum und Uhrzeit des ersten im Datenfeld abgelegten Wertes, menschlich lesbar
    3. last: Datum und Uhrzeit des aktuell letzten im Datenfeld abgelegten Wertes, menschlich lesbar
    4. ts: Zeitstempel als reiner Zahlenwert (timestamp) des ersten Wertes im Datenfeld, menschlich nicht als Zeitangabe lesbar
    5. dt: Zeitdifferenz in Sekunden zwischen jeweils zwei im Datenfeld benachbarten Werten
    6. values: Datenfeld für die lokal gespeicherten Werte

  3. Übertragen aller gespeicherten Werte bzw. der Datenstruktur "Data"
    Das Skript überträgt Data ausschließlich nach Empfang einer Nachricht, hier per MQTT. Nach Empfang dieser Nachricht sendet das Skript den kompletten Inhalt von Data im JSON Format. Je länger ein zeitlicher Abstände zwischen zwei solcher Sendenachrichten ist, desto länger ist die gesendete Nachricht mit den Messwerten. Nach dem Senden löscht das Skript das Datenfeld, speichert den aktuellen Zeitpunkt in Data.start und beginnt somit mit der nächsten Messfolge.

Die folgende Abbildung zeigt ein Protokoll, welches die Mittelwertbildung über vier im Eventhandler eingetroffenen Messwerte anzeigt. Der vierte Wert ist der letzte Mittelwert, welcher, auf eine Nachkommastelle gerundet, in Data.values (ein Datenfeld) an das bisherige Ende angehängt wird. Dieses Anhängen erfolgt per Funktion store(), die von einem aktiven Schedule Job alle 2 Minuten bzw. 120 Sekunden (siehe dt Wert)  aufgerufen wird. Der Wert von dt wird bei Skriptstart aus dem timespec Wert des Schedule Job ermittelt, weshalb hier keine fehlerträchtige Redundanz besteht.

Dies alles gelingt ausschließlich solange, wie der Shelly die synchrone Zeitinformation besitzt, seine interne Uhr also richtig geht. Das ist nur dann der Fall, wenn nach dem Empfang der Zeitinformation von einem Zeitserver sich keine Unterbrechung der Stromversorgung ereignet. Andernfalls sind die in Data abgelegten Zeitinformationen unvollständig. Das betrifft die Komponenten start, first, last und ts. Das hat zur Folge, dass eine Speicherung in einer Zeitreihendatenbank nicht zweckmäßig ist, weil hierzu auch die Zeitpunkte der einzelnen Messwerte gehören müssen. Die zeitlichen Abstände zwischen zwei Messwerten genügen hierfür nicht. Die Prüfung dieser Zeitinformationen auf Gültigkeit ist in dieser Version noch nicht implementiert.

Für die Speicherung in der Datenbank sind ausschließlich ts und dt erforderlich. Die anderen menschlich lesbaren Werte von Datum und Uhrzeit (start, first und last) können bspw. dafür verwendet werden, zur Kontrolle auf einem Dashboard - oder in einer Node-RED Entwicklungsumgebung (Flow) - angezeigt zu werden. Rein funktional sind diese Zeitwerte nicht erforderlich.

Weiterentwicklung

Ich strebe folgende Eigenschaften an.

  1. Das Skript soll möglichst stabil laufen.
  2. Es sollen möglichst viele Messwerte lokal gespeichert werden können.
  3. Die per MQTT gelieferten Werte sollen auf Empfängerseite so wenig wie möglich verarbeitet werden müssen.
  4. Die Datenübertragung soll zügig stattfinden.

Die Eigenschaften 2 und 4 widersprechen sich - Grund folgt.

Diese Eigenschaften sollen durch folgende Maßnahmen erreicht werden.

zu 1.
Es soll keinen Skriptabbruch durch Speicherüberlauf geben (RAM Grenze überschritten). Dies erfordert eine sorgfältige Planung der Datenstruktur. So verbraucht sicherlich in der Funktion send() der Aufruf JSON.stringify(Data) relativ viel RAM, da der komplette String auf einmal erzeugt und somit abgelegt werden muss. Hierzu denke ich über eine abgewandelte Datenstruktur nach - und darüber, wie diese so zu versenden ist, dass der Garbage Collector Gelegenheit erhält, nicht mehr benötigten RAM Speicher zügig freizugeben.

zu 2.
Wenn die Übertragung wenig speicherfressend abläuft, können entsprechend mehr Messwerte lokal gespeichert werden.

zu 3.
Vor der Datenübertragung sind jedem Messwert der dazugehörige Zeitstempel beizufügen, die dazu verwendete Datenstruktur bedarf noch weiterer Überlegungen und Tests. Folgende Strukturen sind naheliegend.

  • JSON, bspw. {"ts":<Zeitstempelwert>,"temp":<Temperaturwert>}
  • Zweielementiges Datenfeld [<Zeitstempelwert>,<Temperaturwert>]
  • zweielementige Liste <Zeitstempelwert>,<Temperaturwert>

zu 1 bis 3.
Um RAM-Verbrauch einzusparen, sollen die Messwerte in kleineren Containern, wie 100 elementige Datenfelder, gespeichert werden - zusammen mit jeweils einem Zeitstempel zum ersten dort abgelegten Messwert. Dies gestattet die weniger speicherintensive Umwandlung der Daten in Text zum anschließenden senden, weil die Daten in kleineren Paketen sukzessive übertragen werden. Damit der Garbage Collektor Zeit zur Speicherfreigabe erhält, soll zwischen dem Senden der Pakete Wartezeit eingebaut werden. Dies widerspricht der angestrebten Eigenschaft 4. Hier ist also ein Kompromiss zu suchen bzw. per Tests eine optimale, kürzeste Wartezeit zu ermitteln.

Konkretisierung der Maßnahmen

Zunächst ist die Datenstruktur Data zu ändern. Wie dies am besten gelingen kann, unterliegt den Möglichkeiten der Skriptsprache. Trotzdem ist es möglich, die angestrebte Datenstruktur zu erreichen.

  1. Aufteilung des bisher einzigen Datenfeldes values in mehrere gleiche Strukturen aus Zeitstempel und Datenfeld für die zu speichernden Messwerte
    Der Zeitstempel bezieht sich immer auf den ersten Messwert im Datenfeld.
    Hier ist zu prüfen, ob die Skriptsprache das Hinzufügen eines neuen Datenfeldes zur Laufzeit (dynamisch) erlaubt. Andernfalls sind alle vorgesehenen Datenfelder (für je bspw. 100 Messwerte) zum Skriptstart anzulegen. Letztlich ist values nun ein zweidimensionales Datenfeld oder ein Datenfeld aus Referenzen, welche auf jeweils ein Datenfeld verweisen. Wie man diese Struktur interpretiert, ist hier belanglos.
  2. Änderung der Parameter in Data
    first, last, ts entfallen. ts wird nun zusammen mit den einzelnen Datenfeldern verwendet (s.o.). Evtl. wird start nicht benötigt und kann entfallen. Folgende Komponenten sind nun erforderlich.
    • Konstante max für die maximale Länge jedes der Datenfelder, bspw. 100
    • Variable size als aktuelle Anzahl der mit Messwerten belegten Datenfelder bzw. Strukturen aus Zeitstempel und Datenfeld
    • dt als Wert, welcher mit der Abtastrate assoziiert ist, bleibt erhalten.

Das Senden aller Messwerte erfolgt in Paketen zu je bspw. 100 Wertepaaren. Nachdem der Inhalt eines Datenfeldes gesendet wurde, ist dieses Datenfeld zu leeren und vor dem Senden des nächsten Paketes eine gewisse Zeit zu warten - per Timer. So soll der Garbage Collektor (GC) die Gelegenheit erhalten, den nicht mehr benötigten Speicher freizugeben. Ein schlichtes "value[x] = []" gibt den bisher von value[x] belegten Speicher nicht frei, dies muss der GC tun. (value[x] ist das Datenfeld mit Index x in der Datenstruktur Data.)

Auf diese Weise werden die Datenpakete sukzessive übertragen, während quasi zugleich die neu eintreffenden Messwerte im ersten dieser Datenfelder abgelegt werden können ...

Das jeweils zu übertragende Datenpaket ist aus dem einzigen Zeitstempel zum ersten Messwert, dem dt Wert (bzgl. Abtastrate) und den einzelnen Messwerten eines Datenfeldes zusammenzusetzen. Das Verfahren ist trivial. Der Zeitstempel zu jedem Messwert ergibt sich aus dem ersten Zeitstempel + Index*dt. Alternativ kann man auch einen Zeitstempel mitlaufen lassen und den neuen Zeitstempel berechnen per Zeitstempel += dt. So werden alle Wertepaare aus Zeitstempel und Temperatur an den MQTT Broker übertragen und schließlich vom MQTT Subscriber empfangen. Der Subscriber muss hierfür kein Node-RED Knoten sein, es ließe sich auch mosquitto-sub in einem Shellskript hierfür verwenden. Was letztlich für den Empfang eingesetzt wird, hängt vom Anwender ab und dessen bevorzugtem Ziel für die empfangenen Daten. Auch die Struktur der MQTT message bzw. payload (s.o. zu 3.) kann leicht den Wünschen des Anwenders angepasst werden.

Die Anwendungsparameter

Da unbekannt ist, in welchen Zeitabständen etwa der GC RAM freigibt, kann hier nur mit Schätzwerten gearbeitet werden, die dem Anwender zugemutet werden können. Ich verwende hierbei folgende Zusammenstellung als Grundlage.

  1. Übertragung der lokal gespeicherten Messwerte nach jeweils 7 Tagen + 1 Tag Sicherheitsabstand.
  2. Alle 10 Minuten wird ein gemittelter Messwert gespeichert.
  3. Zwischen den Übertragungen einzelner Pakete werden 10s Wartezeit eingelegt.

Dies ergibt folgende Parameter.

Anzahl an maximal zu speichernden Messwerte n = 8 * 24 * 6 =  1152

Da der Wert von n eine Zweierpotenz ist, lege ich Data.max mit 128 fest. Dies ergibt 9 Strukturen aus Zeitstempel und Datenfeld für je bis zu 128 Messwerten.

Somit ergibt sich die maximale Übertragungsdauer für alle Messwerte mit ca. t = (9 - 1) * 10s = 80s. Diese Übertragungsdauer dürfte für den Anwender zumutbar sein. In Feldversuchen ließe sich diese Dauer durch sukzessives verkleinern der Wartedauer verringern, möglicherweise würde gar keine Wartedauer mehr erforderlich sein. Die Wartedauer sollte jedoch nebenbei den Vorteil mit sich bringen, dass die Pakete in der zeitlich passenden Reihenfolge beim Empfänger eintreffen und diesem genügend Zeit dazwischen bleibt, jedes Paket zu verarbeiten. Ein gut zusammengestellter Empfänger (MQTT Subscriber) könnte allerdings an Hand der Zeitstempel die einzelnen Datenpakete sortieren und dann geeignet zusammenfügen bzw. in passender Reihenfolge speichern. Um dies zu erleichtern kann der bisher geplanten Übertragung ein Header hinzugefügt werden, in welchem Parameter wie Anzahl der Pakete, max. Paketgröße ... enthalten sind. Letztlich erwiese sich auch hier der Vorteil des JSON Formats, was aber nicht zwingend ist.

In einer professionellen Variante wären solche Parameter im KVS abzulegen und das Skript sollte diese nach dem Einlesen im Aufbau von Data verwenden. Dies implementiere ich erst, wenn ich dafür finanzielle Mittel erhalte. 😁

Überblick

Es handelt sich um eine FiFo Datenstruktur (first in, first out), was auch als Warteschlange (engl. queue) bekannt ist. Die Elemente dieser FiFo Struktur sind Strukturen aus Zeitstempel und Datenfeld maximaler Größe von bspw. 100. Sowohl beim abspeichern eines (gemittelten) Messwertes als auch beim senden wird jeweils mit dem ersten Element beginnend Richtung letztem (nichtleeren) Element gearbeitet. Man stelle sich einen Schubladenschrank vor, alle Schubladen sind gleich strukturiert. Gefüllt und geleert wird immer mit der untersten Schublade beginnend.

Aktuell

2024-01-30:
Das Skript ist in der neuen Variante in Arbeit. Die Struktur Data steht, wenn auch etwas größer als geplant. Die Speicherung in mehreren Datenfeldern wird getestet, bisher läuft es wie gewünscht. Auch die neue Datenübertragung funktioniert bisher. Die Verarbeitung der Daten auf der Empfängerseite besteht ausschließlich im speichern in einer Textdatei. Das Speichern in einer Influx Datenbank ist gegenwärtig ausgesetzt, da sich die empfangene Datenstruktur geändert hat.

2024-02-02: Neue Erkenntnisse
Jedes Skript erhält ein Speicherkontingent von 25200 (Bytes?). Das ist recht wenig für dieses Projekt. Das Skript läuft stabil mit 3 Strukturen zu je einem Timestamp und bis zu 128 Messwerten. Ich will versuchen, die Anwendung auf mehrere Skripte aufzuteilen - ein "Hauptskript" und mehrere speichernde Skripte, die vom Hauptskript gesteuert werden. Zunächst ist das nur eine Idee.
Mit dem gegenwärtigen Skript lassen sich in 8 Tagen alternativ speichern:
- je 30 Minuten ein Messwert über 24h je Tag
- je 20 Minuten ein Messwert über 16h je Tag
- je 15 Minuten ein Messwert über 12h je Tag

Informationen zur Datenübertragung bereitstellen

Da der Anwender per Smartphone Hotspot dem Shelly zwar den Zugang zum Internet und damit zu einem MQTT Broker bereitstellt bzw. bereitstellen muss, er aber keine Informationen zur erfolgten oder gar ablaufenden Datenübertragung erhält, sehe ich noch einen kleinen Webserver hierfür vor.

Dieser Webserver soll zumindest die Zeitinformation zur letzten kompletten Datenübertragung liefern. So kann der Anwender erkennen, ob die aktuelle Datenübertragung abgeschlossen ist. Hierfür sind immer nach Abschluss einer kompletten Datenübertragung Datum und Uhrzeit oder Zeitstempel im KVS abzulegen. Die vom Skript gelieferte Webseite ist ca. alle 10s zu aktualisieren. Interessant kann auch die Information über die aktuell laufende Übertragung sein, bspw. wieviel Pakete von wievielen insgesamt wurden aktuell bereits übertragen.

Neu

Ich lasse die Webseite weg. Der Anwender kann hierfür die Android App MQTT Dash verwenden. Es stehen drei Kacheln zur Verfügung.
- zum Triggern einer Übertragung (button)
- zum Beobachten des Fortschritts der Übertragung (progress)
- zur Ausgabe von Datum und Uhrzeit der letzten Übertragung (text)

Es zeigte sich, dass test.mosquitto.org die Nachrichten verlässlich überträgt, was der hivemq Broker leider nicht tut. Datum und Uhrzeit der letzten Übertragung werden mit gesetztem Retain Flag übertragen, damit sich der Anwender jederzeit diese Zeitangabe zeigen lassen kann.

2024-02-02