Transformations#
Introduction#
This section of the documentation will outline how to configure basic and advanced data transformations to apply to the data processed by mqttwarn.
You will learn how to leverage corresponding options on configuration sections
for mqttwarn topics, like format
, template
, filter
, and
alldata
, in order to manipulate the outbound data destined to the
services.
Overview#
This guideline starts out easy, but will advance to more sophisticated configuration details and examples. Please read this document carefully.
Verbatim forwarding#
To simply forward an incoming MQTT message without any transformations, you don’t
need to do anything other than configure the target. Add a topic section to your
mqttwarn.ini
, by simply naming it after the topic you wish to have forwarded,
and within the configuration section body, define the targets
option.
The payload of the inbound message will then be forwarded to the defined Services plugin. Whether it simply says “ON”, or contains a large JSON dictionary, is of no concern, because the message is forwarded verbatim.
[office/ups]
targets = log:debug
This example demonstrates how to subscribe to messages on the MQTT topic
office/ups
, and save them into the mqttwarn.log
file with a debug
label.
Note
This assumes that you have configured the log section the way described at the Services.
Transformation data#
The information that is available to you in creating the outbound message is called “the transformation data”. A very basic set of transformation data is the following:
{
"topic": "foo/bar", # MQTT topic
"payload": "Hello world!", # MQTT message payload
"_dtepoch": 1392628581, # "epoch time"
"_dtiso": "2014-02-17T10:38:43.910691Z", # ISO date (UTC)
"_dthhmm": "10:16", # timestamp HH:MM (local)
"_dthhmmss": "10:16:21", # timestamp HH:MM:SS (local)
}
The transformation data can be extended by running decoding functions.
Basic formatting#
mqttwarn provides several options to create a different outbound message, allowing you, for example, to make it more human-readable.
The most basic option is called “formatting”. The title
and format
options
define the title and the body of the outbound message. With that feature,
you can turn an MQTT payload that simply states “ON”, into a friendlier version.
Both options effectively define template strings, where transformation data variables can be interpolated into.
[office/ups]
title = Office UPS
format = The office UPS is {payload}
Please note that the original MQTT payload is referenced per payload
variable,
so that if the UPS is switched off, and sends out a corresponding MQTT message,
the outbound message will state it so.
Template files#
Instead of formatting output with the format
specification as described
above, mqttwarn can render the output message with Jinja2 templates.
This is particularly interesting for the smtp
, nntp
, and file
targets.
Consider the following example topic configuration, where we
illustrate using the template
option instead of format
.
[nn/+]
targets = nntp:jpaa
; format = {name}: {number} => {_dthhmm}
template = demo.j2
mqttwarn loads Jinja2 templates from the templates/
directory relative
to the configured directory
.
Example#
Assuming we have the following template file templates/demo.j2
:
{#
this is a comment
in Jinja2
See https://jinja.palletsprojects.com/templates/ for information
on Jinja2 templates.
#}
{% set upname = name | upper %}
{% set width = 60 %}
{% for n in range(0, width) %}-{% endfor %}
Name.................: {{ upname }}
Number...............: {{ number }}
Timestamp............: {{ _dthhmm }}
Original payload.....: {{ payload }}
After processing, it will produce the following outcome, which will be forwarded to any target which uses this configuration.
------------------------------------------------------------
Name.................: JANE JOLIE
Number...............: 47
Timestamp............: 19:15
Original payload.....: {"name":"Jane Jolie","number":47, "id":91}
More details#
The template variable {{ payload }}
will interpolate the original MQTT message
payload into the template string.
If the payload was JSON, its content is also decoded and available as template variables, together with all the other transformation data.
If the template cannot be rendered, say, if it contains a Jinja2 error, or if the template file cannot be found, etc., the original raw message is used in lieu on output.
Decoding JSON#
Other than just passing the payload received via MQTT to a service, mqttwarn allows you to do the following:
Transform payloads on a per-topic basis. For example, you know you will be receiving JSON, but you want to warn with a nicely formatted message.
For certain services, you can change the title (or subject) of the outgoing message.
For certain services, you can change the priority of the outgoing message.
Note
mqttwarn will gracefully attempt to decode inbound MQTT messages from JSON. If
it works, it will merge the content into the Transformation data. If not,
nothing else happens, and the raw data will be available within the payload
field.
Note
Embedded "\n"
are converted to newlines on output.
Example#
Consider the following JSON payload published to the MQTT broker:
mosquitto_pub -t 'osx/json' -m '{"fruit":"banana", "price": 63, "tst" : "1391779336"}'
Using the format
option, you can configure mqttwarn to transform that JSON
into a different outbound message, which is actually the text that will get
submitted as notification. The format
option is effectively a template string,
where transformation data variables can be interpolated into.
format = "I'll have a {fruit} if it costs {price}"
The result is:
Decoding nested JSON#
Within templates and formats, you can only refer to the top-level names of an
incoming JSON message, which significantly limits the kinds of messages
mqttwarn
can process. Currently, you will need to flatten your document
using an alldata
transformation function.
That means, you should build a new JSON message with only top-level values, where you pick values of interest from your nested JSON document.
Todo
We are looking into providing a generic way to access nested elements at access nested elements in JSON payloads. In the meanwhile, for doing it in a custom way, please have a look at the (#nested-json-example).
Transforming JSON#
When receiving JSON data like {"data": {"humidity": 62.18}}
, you might
want to extract values using the format
mechanism before forwarding
it to other data sinks.
format = "{data}"
However, by default, the outcome will be the string-serialized form of the
Python representation, {'humidity': 62.18}
, which could not be what you
want if your data sink is expecting JSON format again.
To make it work as intended, you should use appropriate type coercion
for JSON data using the !j
suffix.
format = "{data!j}"
This will serialize the formatted data to JSON format appropriately,
so the outbound message will be {"humidity": 62.18}
.
User-defined functions#
You can define custom code to be invoked on different stages of the processing pipeline. This is a powerful utility. If you are in a hurry, you can skip the documentation, and get inspired what you can do by exploring user-defined functions by example.
Introduction#
A topic section in the configuration file can have different topic options defined, some of them are callbacks where you can configure user-defined Python functions. It works for all of those options:
filter
: boolean, or a function that returns a booleanalldata
: dictionary, or a function that returns a dictionarytitle
: string, or a function that returns a stringformat
: string, or a function that returns a stringimage
: see belowpriority
: see below
Those topic options do not accept a user-defined function:
targets
topic
qos
Getting started#
You will start out by telling mqttwarn where to find your Python code.
Edit the mqttwarn.ini
configuration file, navigate to the [defaults]
section, and configure the functions
option appropriately.
[defaults]
; Path to file containing user-defined functions like `format` or `alldata`.
functions = 'udf.py'
Then, in the topic section, define the MQTT topic to subscribe to,
and configure the user-defined function for the format
option. That’s it.
[test/topic]
format = my_function()
Note
When relative file names are configured at the functions
option, they will
be resolved from the directory of the mqttwarn.ini
file, which is, by default,
the /etc/mqttwarn
directory.
In order to create a blueprint file for your udf.py
, you can use the
mqttwarn make-udf
command.
Filtering#
Within the mqttwarn processing pipeline, filtering functions will be invoked first, and will permit you to terminate processing early.
A function called from the filter
option in a topic section needs
to return True
to stop the outbound notification. It obtains topic
,
payload
, section
, and srv
arguments, where payload
is the MQTT message
payload as a Python string decoded from UTF-8.
The function signature looks like this.
from mqttwarn.model import Service
def example_filter(topic: str, payload: str, section: str, srv: Service):
pass
Decoding#
After filtering, mqttwarn will invoke the decoding step, using the function
defined by the alldata
option in a topic section.
The function is expected to either return a Python dictionary,
or None
. It obtains topic
, data
, and srv
arguments, where data
is
also a dictionary wrapping the inbound MQTT
message, see transformation data. The returned data
is merged into the transformation data.
The function signature looks like this.
from mqttwarn.model import Service
from typing import Any, Dict
def example_alldata(topic: str, data: Dict[str, Any], srv: Service):
pass
The keys in the dictionary returned from this function can be used when
describing the outbound message by using the title
and format
options
of the same topic section, see also Basic formatting.
Formatting#
Both the title
and the format
options in the topic section can contain a
string where {bracketed}
references get resolved using the transformation data
returned from the data mapping function alldata
, see also Basic formatting.
Alternatively, they can call a function that returns a string that may or may not contain such references. The functions called here do not have access to the actual dictionary returned from data mapping functions, though.
For example, a minimal user-defined function suitable for the format
option,
effectively doing nothing, looks like this.
def noop_formatter(data, srv=None):
return data
[test/topic]
format = noop_formatter()
Attention
If a function operating on a message, for example within format =
, returns
None
, or an empty string, the target notification is suppressed.
Todo
Should the detail “do not have access to the actual dictionary” be improved in one way or another?
The Service
object#
What is common to all user-defined functions is that they will receive inbound data from mqttwarn, and it is your obligation to return outbound data or signal information back.
Optionally, the user-defined functions will obtain a srv
argument, which
is an object with a few helper functions, and, at the same time, provides
access to internal machinery like the instance of the paho.mqtt.client.Client
object (which provides a plethora of properties and methods), to the mqttwarn
logger instance, to the Python globals()
method and all that entails, and to
the name of the program currently being executed.
This example function will give you an idea how to use those features.
from mqttwarn.model import Service
from typing import Dict
def publishing_formatter(data: Dict[str, str], srv: Service):
message = "¡Hola!"
srv.logging.info(f"+++++++++++ {message}")
srv.mqttc.publish("topic/response", message, qos=0, retain=False)
return message
Attention
Be advised that if you publish back to the same MQTT topic which triggered the invocation of your function, you will create an endless loop.
User-defined function examples#
In this section, you can explore a few example scenarios where user-defined functions are being used.
format
: Replacing incoming payloads#
The format
option will be evaluated at the last stage of the processing
pipeline. The string returned by a corresponding function replaces the outgoing
message
slot completely.
Consider the following user-defined function,
from mqttwarn.model import Service
from typing import Any
def example_formatter(data: Any, srv: Service):
"""
An example formatting function.
>>> example_formatter({"fruit": "pineapple"})
'Ananas'
"""
if isinstance(data, dict) and "fruit" in data:
return "Ananas"
return None
and this configuration snippet which uses that function, instead of a string template,
[defaults]
; Path to file containing self-defined functions like `format` or `alldata`.
functions = 'udf.py'
[test/topic]
#format = Since when does a {fruit} cost {price}?
format = example_formatter()
and see how it works:
in/a1 {"fruit":"pineapple", "price": 131, "tst" : "1391779336"}
out/food Ananas
out/fruit/pineapple Ananas
Note
When the inbound data can be decoded from JSON, the format
function will be
invoked with decoded JSON as well. The data
argument will then obtain a
Python dictionary. Otherwise, the function will
obtain the raw inbound message payload.
Tip
In the same spirit, you are usually not returning JSON here. If you want to return JSON, make sure to serialize it to a string on your own behalf.
filter
: Filtering notifications#
A notification can be filtered (or suppressed, or ignored) by a user-defined
function, by configuring the filter
option within a topic section.
[owntracks/#/phone]
filter = owntracks_filter()
Now, when a message for the defined topic owntracks/jane/phone
is received,
the function owntracks_filter()
will be invoked to determine whether to
process the message.
The filter function should return True
if the message should be suppressed,
or False
if the message should be processed. This is a basic but working
example for such a filter function.
from mqttwarn.model import Service
def owntracks_filter(topic: str, message: str, section: str, srv: Service):
"""
Only process messages containing "event", skip all others.
>>> owntracks_filter("owntracks/jane/phone", "event: something significant")
False
>>> owntracks_filter("owntracks/jane/phone", "trace: something else")
True
"""
is_event_message = message.find("event") != -1
return not is_event_message
Note
The topic
parameter will be the name of the specific topic that the message
was received on, here owntracks/jane/phone
. The name of the section will be
the section
argument, here owntracks/#/phone
.
alldata
: Decoding topic names#
Use case: An MQTT topic contains information you want to use in transformations.
As an example, let’s consider the OwnTracks system. When an OwnTracks device
detects a change of a configured waypoint or geo-fence, it emits a JSON payload
which looks like this, on a topic like owntracks/<username>/<deviceid>
. For example:
owntracks/jane/phone {"_type": "location", "lat": "52.4770352", "desc": "Home", "event": "leave"}
In order to obtain the username (jane
) and her device name (phone
) from the
MQTT topic, and use it within the outbound message, we need to decode
it into the transformation data. To accomplish that, add a user-defined function,
and assign it to the alldata
option within the configuration file.
This specifies that when a message for the defined topic owntracks/jane/phone
is processed, the user-defined function decode_owntracks_topic()
should be
invoked to transform the data. Topic names may also contain MQTT wildcards.
[owntracks/jane/phone]
alldata = decode_owntracks_topic()
The function we define to do that, looks like this:
def decode_owntracks_topic(topic):
"""
Decode an OwnTracks MQTT topic like `owntracks/<username>/<device>`.
>>> decode_owntracks_topic("owntracks/jane/phone")
{'username': 'jane', 'device': 'phone'}
"""
if isinstance(topic, str):
try:
parts = topic.split('/')
username = parts[1]
device_id = parts[2]
except:
username = 'unknown'
device_id = 'unknown'
return dict(username=username, device=device_id)
return None
The returned dictionary is merged into the transformation data, i.e. it is made available to plugins and to formatters.
Now, if we then create a format
rule like
format = {username}@{device}: {event} => {desc}
the user-defined function will transform it into an outbound message as directed.
jane@phone: leave => Home
alldata
: Decoding nested JSON#
Use case: Say we are receiving messages from a temperature sensor running Tasmota, and we wish to convert them into InfluxDB line format.
The incoming Tasmota JSON status response for a DS18B20 sensor looks like this.
{
"Time": "2018.02.01 21:29:40",
"DS18B20": {
"Temperature": 19.7
},
"TempUnit": "C"
},
Since the nested Temperature
item cannot be referenced directly within a
format
definition, we need to make it a top-level value of the transformation
data. While we are at it, we can change the date to milliseconds since the epoch,
and also include the MQTT topic.
{
"topic": "tasmota/temp/ds/1",
"timestamp": 1517525319000,
"temperature": 19.7
}
This can be accomplished with the following user-defined function.
import ast
import logging
import time
from datetime import datetime
from typing import Dict
from mqttwarn.model import Service
def ds18b20_values(topic: str, data: Dict[str, str], srv: Service):
payload = ast.literal_eval(data["payload"])
ts = datetime.strptime(payload["Time"], "%Y.%m.%d %H:%M:%S")
millis = int(time.mktime(ts.timetuple()) * 1000)
temp = payload["DS18B20"]["temperature"]
outdata = dict( topic = topic, temperature = temp, timestamp = millis )
logging.debug(outdata)
return outdata
When applying it to a topic,
[tasmota/temp/ds/+]
targets = log:info
alldata = ds18b20_values()
format = weather,topic={topic} temperature={temperature} {timestamp}
mqttwarn will format the outbound message like this.
weather,topic=tasmota/temp/ds/1 temperature=19.7 1517525319000