However, if a downlink that is scheduled while handling an uplink is not transmitted after that very uplink, then it’s still scheduled for the next. And then it might be transmitted before a new downlink is scheduled during handling of the next uplink. That’s why for time synchronisation (and possibly other use cases too) one needs some counter, so the node can tell which downlink was created by which uplink.
I think this might also depend on the total traffic that the network server is handling and maybe even on using MQTT or the HTTP Integration. I’ve done quite a bit of testing using the Kickstarter’s The Things Node and The Things Gateway, along with MQTT and Python. The Python code did not have to do a lot, like it did not access some database which might take too much time. Instead, it only scheduled a downlink (using schedule="replace"
) right after being notified of the uplink, to set the LED color depending on what event triggered the uplink, and did quite a bit of logging. That made it easy to show that often the wrong color was set, due to an earlier, pending scheduled downlink. Also the logging of MQTT downlink events would often show that one was “sent” to the gateway before the new one was scheduled. And things might look okay at some point, but not some time later.
Like see the unexpected order of the MQTT .../events/down/sent
(of the pending downlink, which we wanted to replace) before .../events/down/scheduled
(of the newly scheduled downlink, which shows in the log two lines earlier) in:
18:35:29.200 Uplink: ...; fields={'event': 'button', ...}
18:35:29.200 TTN schedule downlink: ...
18:35:29.201 TTN event: ...; type=events/down/sent
18:35:29.228 TTN event: ...; type=events/down/scheduled
This is not related to actually publishing something through MQTT. When not scheduling a downlink (for my code: when the uplink was triggered by a motion event), a pending one might still be sent off to the gateway immediately after receiving the uplink in the MQTT subscription:
18:46:10.632 Uplink: ...; fields={'event': 'motion', ...}
18:46:10.657 TTN event: ...; type=events/down/sent
Maybe using the HTTP Integration reduces the latency that MQTT might show? The HTTP Integration shows the same results. Apparently (back in 2017) the application would have 100ms to schedule/replace a downlink, but that of course assumes that the application gets the uplink (through MQTT or the HTTP Integration) almost instantly:
The Things Node sketch
#include <TheThingsNode.h>
// Set your AppEUI and AppKey
const char *appEui = "70B3D00000000000";
const char *appKey = "00000000000000000000000000000000";
#define loraSerial Serial1
#define debugSerial Serial
// Use TTN_FP_EU868 or TTN_FP_US915
#define freqPlan TTN_FP_EU868
TheThingsNetwork ttn(loraSerial, debugSerial, freqPlan);
TheThingsNode *node;
#define PORT_SETUP 1
#define PORT_INTERVAL 2
#define PORT_MOTION 3
#define PORT_BUTTON 4
/*
Decoder payload function
------------------------
function Decoder(bytes, port) {
var decoded = {};
var events = {
1: 'setup',
2: 'interval',
3: 'motion',
4: 'button'
};
decoded.event = events[port];
decoded.battery = (bytes[0] << 8) + bytes[1];
decoded.light = (bytes[2] << 8) + bytes[3];
if (bytes[4] & 0x80)
decoded.temperature = ((0xffff << 16) + (bytes[4] << 8) + bytes[5]) / 100;
else
decoded.temperature = ((bytes[4] << 8) + bytes[5]) / 100;
return decoded;
}
*/
void setup()
{
loraSerial.begin(57600);
debugSerial.begin(9600);
// Wait a maximum of 10s for Serial Monitor
while (!debugSerial && millis() < 10000)
;
// Config Node
node = TheThingsNode::setup();
node->configLight(true);
node->configInterval(true, 60000);
node->configTemperature(true);
node->onWake(wake);
node->onInterval(interval);
node->onSleep(sleep);
node->onMotionStart(onMotionStart);
node->onButtonRelease(onButtonRelease);
// Test sensors and set LED to GREEN if it works
node->showStatus();
node->setColor(TTN_GREEN);
debugSerial.println("-- TTN: STATUS");
ttn.showStatus();
ttn.onMessage(onMessage);
debugSerial.println("-- TTN: JOIN");
ttn.join(appEui, appKey);
debugSerial.println("-- SEND: SETUP");
sendData(PORT_SETUP);
}
void loop()
{
node->loop();
}
void interval()
{
node->setColor(TTN_BLUE);
debugSerial.println("-- SEND: INTERVAL");
sendData(PORT_INTERVAL);
}
void wake()
{
// node->setColor(TTN_GREEN);
}
void sleep()
{
node->setColor(TTN_BLACK);
}
void onMotionStart()
{
node->setColor(TTN_BLUE);
debugSerial.print("-- SEND: MOTION");
sendData(PORT_MOTION);
}
void onButtonRelease(unsigned long duration)
{
node->setColor(TTN_BLUE);
debugSerial.print("-- SEND: BUTTON");
debugSerial.println(duration);
sendData(PORT_BUTTON);
}
void sendData(uint8_t port)
{
// Wake RN2483
ttn.wake();
ttn.showStatus();
node->showStatus();
byte *bytes;
byte payload[6];
uint16_t battery = node->getBattery();
bytes = (byte *)&battery;
payload[0] = bytes[1];
payload[1] = bytes[0];
uint16_t light = node->getLight();
bytes = (byte *)&light;
payload[2] = bytes[1];
payload[3] = bytes[0];
int16_t temperature = round(node->getTemperatureAsFloat() * 100);
bytes = (byte *)&temperature;
payload[4] = bytes[1];
payload[5] = bytes[0];
ttn.sendBytes(payload, sizeof(payload), port);
// Set RN2483 to sleep mode
ttn.sleep(60000);
// This one is not optionnal, remove it
// and say bye bye to RN2983 sleep mode
delay(50);
}
void onMessage(const uint8_t *payload, size_t size, port_t port)
{
debugSerial.println("-- MESSAGE");
debugSerial.print("Received " + String(size) + " bytes on port " + String(port) + ":");
for (int i = 0; i < size; i++)
{
debugSerial.print(" " + String(payload[i]));
}
debugSerial.println();
node->setRGB((payload[0] & 0b100) > 0, (payload[0] & 0b010) > 0, (payload[0] & 0b001) > 0);
delay(1000);
}
Python MQTT client
A Python client that tries to set the LED color to red when the uplink was due to an expiring interval, or green when its button was pressed, or not set the LED at all when triggered due to motion detection.
You’ll often see that the LED still changes color when moving it, or show the wrong color for the other events, due to pending downlinks being transmitted. Also, the Python code’s logging will make this clear.
# This needs pip install paho-mqtt
import base64
import json
import logging
import paho.mqtt.client as mqtt
import struct
from typing import Callable
# The APP_ID is NOT the AppEUI
APP_ID = "my-app-id"
APP_ACCESS_KEY = "ttn-account-v2.kF<redacted>il4"
MQTT_SERVER = "eu.thethings.network"
MQTT_PORT = 8883
# Based on https://github.com/TheThingsNetwork/python-app-sdk/blob/master/ttn/ttnmqtt.py
class DownlinkMessage:
def __init__(self, port, confirmed=False, schedule=None):
self.port = port
self.confirmed = confirmed
self.schedule = schedule
def obj2json(self):
return str(json.dumps(self.__dict__))
class TtnMqttClient:
def __init__(self, app_id, app_access_key, mqtt_broker, mqtt_port=8883):
self._app_id = app_id
self._app_access_key = app_access_key
self._mqtt_broker = mqtt_broker
self._mqtt_port = mqtt_port
self._on_uplink = None
self._client = mqtt.Client()
# Errors in callbacks are all caught by Paho, and delegated to on_log
self._client.on_log = self.on_log
self._client.on_connect = self.on_connect
self._client.on_disconnect = self.on_disconnect
self._client.on_subscribe = self.on_subscribe
self._client.on_message = self.on_message
self._client.username_pw_set(self._app_id, self._app_access_key)
# ca_certs=None works just well; by default, on Python 2.7.9+ or 3.4+,
# the default certification authority of the system is used.
self._client.tls_set()
def run_forever(self):
self.connect()
# Will also reconnect if needed
self._client.loop_forever()
def stop(self):
self._client.loop_stop()
self._client.disconnect()
def connect(self):
self._client.connect(self._mqtt_broker, self._mqtt_port)
def on_log(self, client, userdata, level, buf):
"""
Log all MQTT protocol events, and the exceptions in callbacks that have
been caught by Paho.
"""
logging_level = mqtt.LOGGING_LEVEL[level]
logging.log(logging_level, buf)
def on_connect(self, client, userdata, flags, rc):
logging.info("MQTT connected: result code=%i", rc)
if rc == 0:
# Subscribe to all events (activations, up, down, ...) for all devices
topic = "{}/#".format(APP_ID)
res = client.subscribe(topic)
logging.info("MQTT subscribing: topic=%s", topic)
if res[0] != mqtt.MQTT_ERR_SUCCESS:
raise RuntimeError("the client is not connected")
if rc == 1:
raise RuntimeError("connection refused - incorrect protocol version")
if rc == 2:
raise RuntimeError("connection refused - invalid client identifier")
if rc == 3:
raise RuntimeError("connection refused - server unavailable")
if rc == 4:
raise RuntimeError("connection refused - bad app_id or access_key")
if rc == 5:
raise RuntimeError("connection refused - not authorised")
# 6 - 255: currently unused
def on_disconnect(self, client, userdata, rc):
# Result is MQTT_ERR_SUCCESS if disconnected due to explicitly invoking
# disconnect. For all other cases, due to using `loop_forever()`, the
# Paho client will reconnect automatically.
logging.warning("MQTT disconnect: result code=%i", rc)
def on_subscribe(self, client, userdata, mid, granted_qos):
logging.info("MQTT subscribed")
def on_message(self, client, userdata, msg):
"""
Log the events, and delegate uplinks to the registered handler, if any.
"""
payload = msg.payload.decode("utf-8")
levels = msg.topic.split('/', 3)
dev_id = levels[2]
# E.g. events/activations, up, events/down/scheduled, events/down/sent
type = levels[3]
logging.debug("MQTT message: dev_id=%s; topic=%s; payload=%s", dev_id, msg.topic, payload)
logging.info("TTN event: dev_id=%s; type=%s", dev_id, type)
if type == "up" and self._on_uplink is not None:
# Curried downlink helper
def schedule_downlink(bytes, port=1):
self.schedule_raw(dev_id, bytes, port)
self._on_uplink(msg, schedule_downlink)
def schedule_raw(self, dev_id, bytes, port=1, confirmed=False, schedule="replace"):
"""
Schedule a downlink for the given raw bytes.
"""
topic = "{}/devices/{}/down".format(APP_ID, dev_id)
raw = base64.b64encode(bytes).decode("utf-8")
message = DownlinkMessage(port, confirmed, schedule)
message.payload_raw = raw
msg = message.obj2json()
logging.info("TTN schedule downlink: dev_id=%s; message=%s", dev_id, msg)
res = self._client.publish(topic, msg)
if res.rc != mqtt.MQTT_ERR_SUCCESS:
raise RuntimeError("client not connected")
@property
def on_uplink(self):
return self._on_uplink
@on_uplink.setter
def on_uplink(self, func: Callable[[bytes, Callable[[bytes, int], None]], None]):
self._on_uplink = func
def main():
logging.basicConfig(format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
def handle_uplink(msg, schedule_downlink):
"""
Handle an uplink from a The Things Node, by trying to set its LED in case of
button and interval events.
Scheduling the downlink often will not yield a transmission right after this
very uplink. The colors indicate after which type of uplink the downlink was
scheduled, and seeing any LED after a motion event also indicates that a
previous downlink was scheduled too late.
"""
payload = json.loads(msg.payload.decode("utf-8"))
port = payload["port"]
dev_id = payload["dev_id"]
# As set in the Payload Formats' Decoder in TTN Console. We could also just
# use msg.port here.
fields = payload["payload_fields"]
event = fields["event"]
logging.info("Uplink: dev_id=%s; port=%i; fields=%s", dev_id, port, fields)
# The Arduino sketch uses blue (RGB 001) for the uplink
if event == "interval":
# red
rgb = 0b100
elif event == "button":
# green
rgb = 0b010
elif event == "motion":
# Don't schedule a downlink; an earlier downlink might still be pending though...
rgb = 0
else:
# magenta = red and blue
rgb = 0b101
if rgb > 0:
schedule_downlink(struct.pack('b', rgb))
ttnode = TtnMqttClient(APP_ID, APP_ACCESS_KEY, MQTT_SERVER, MQTT_PORT)
ttnode.on_uplink = handle_uplink
ttnode.run_forever()
if __name__ == '__main__':
main()