Frigate » Forward events and snapshots to ntfy#
About#
This tutorial presents a notification pipeline, which implements forwarding Frigate events to ntfy notifications, using mqttwarn. It looks like this:
Frigate -> Mosquitto -> mqttwarn -> ntfy
Components#
Frigate (Frigate on GitHub) is a network video recorder (NVR) with realtime local object detection for IP cameras. It uses MQTT to publish events in JSON format and camera pictures in JPEG format.
Eclipse Mosquitto (Mosquitto on GitHub) is an open source message broker that implements the MQTT protocol versions 5.0, 3.1.1 and 3.1. Mosquitto is lightweight and is suitable for use on all devices from low power single board computers to full servers.
mqttwarn (mqttwarn on GitHub) is a highly configurable MQTT message router, where the routing targets are notification plugins, written in Python. mqttwarn has a corresponding notification plugin adapter for ntfy.
ntfy (ntfy on GitHub) is a simple HTTP-based pub-sub notification service, allowing you to send notifications to your phone or desktop from any computer, entirely without signup, cost or setup.
Synopsis#
Publish Frigate sample events.
cat assets/frigate-event-new-good.json | jq -c | mosquitto_pub -t 'frigate/events' -l
mosquitto_pub -f goat.png -t 'frigate/cam-testdrive/goat/snapshot'
Enjoy the outcome.
Usage#
Configuration#
Please inspect the frigate.ini mqttwarn configuration file and adjust it to your needs before running mqttwarn on it. If you also want to inspect the corresponding user-defined functions, you are most welcome. They are stored within frigate.py.
Prerequisites#
Acquire sources and go to the right directory:
git clone https://github.com/mqtt-tools/mqttwarn
cd mqttwarn/examples/frigate
In a box#
Start the Mosquitto MQTT broker and the ntfy service:
docker compose up
Subscribe to ntfy topic by visiting http://localhost:5555/frigate-testdrive.
Run mqttwarn:
MQTTWARNINI=frigate.ini mqttwarn
Run the example publisher program:
./publish.sh
Manually#
Publish a few example events individually:
cat assets/frigate-event-new-good.json | jq -c | mosquitto_pub -t 'frigate/events' -l
cat assets/frigate-event-end.json | jq -c | mosquitto_pub -t 'frigate/events' -l
cat assets/frigate-event-false-positive.json | jq -c | mosquitto_pub -t 'frigate/events' -l
Publish an example image:
wget -O goat.png https://user-images.githubusercontent.com/453543/231550862-5a64ac7c-bdfa-4509-86b8-b1a770899647.png
mosquitto_pub -f goat.png -t 'frigate/cam-testdrive/goat/snapshot'
open /tmp/mqttwarn-frigate-cam-testdrive-goat.png
Details#
The implementation is based on mqttwarn core, its ntfy service plugin, the
mqttwarn configuration file frigate.ini
, as well as the user-defined function
file frigate.py
. You can inspect them below.
Inspect configuration file frigate.ini
# Frigate » Forward events and snapshots to Ntfy, using mqttwarn.
# https://mqttwarn.readthedocs.io/en/latest/examples/frigate/README.html
[defaults]
functions = frigate.py
launch = ntfy, store-image
status_publish = True
# This scenario needs two workers, because it needs the headroom of two threads
# running in parallel, to synchronize _two_ distinct Frigate events with each other,
# in order to send out _one_ notification.
num_workers = 2
# =====================
# Frigate event to ntfy
# =====================
# Format: JSON
# Docs: https://docs.frigate.video/integrations/mqtt/#frigateevents
[config:ntfy]
targets = {
'test': {
'url': 'http://username:password@localhost:5555/frigate-testdrive',
'file': '/tmp/mqttwarn-frigate-{camera}-{label}.png',
'click': 'https://httpbin.org/anything?camera={event.camera}&label={event.label}&zone={event.entered_zones[0]}',
# Wait for the file to arrive for three quarters of a second, and delete it after reading.
'__settings__': {
'file_retry_tries': 10,
'file_retry_interval': 0.075,
'file_unlink': True,
}
}
}
[frigate/events]
filter = frigate_events_filter()
alldata = frigate_events()
targets = ntfy:test
title = {event.label} entered {event.entered_zones_str} at {event.time}
format = {event.label} was in {event.current_zones_str} before
# Limit the alert based on camera/zone.
frigate_skip_rules = {
'rule-1': {'camera': ['frontyard'], 'entered_zones': ['lawn']},
}
# =====================
# Frigate image to file
# =====================
# Format: Binary (PNG or JPEG)
# Docs: https://docs.frigate.video/integrations/mqtt/#frigatecamera_nameobject_namesnapshot
[config:store-image]
module = file
targets = {
'cam-testdrive-goat': ['/tmp/mqttwarn-frigate-cam-testdrive-goat.png'],
'cam-testdrive-squirrel': ['/tmp/mqttwarn-frigate-cam-testdrive-squirrel.png'],
}
# Configure `file` plugin to pass through payload 1:1.
append_newline = False
decode_utf8 = False
overwrite = True
[frigate/+/+/snapshot]
alldata = frigate_snapshot_decode_topic()
targets = store-image:{camera_name}-{object_name}
Inspect user-defined function file frigate.py
# -*- coding: utf-8 -*-
"""
Frigate » Forward events and snapshots to Ntfy, using mqttwarn.
https://mqttwarn.readthedocs.io/en/latest/examples/frigate/README.html
"""
import dataclasses
import json
import re
import typing as t
from collections import OrderedDict
from datetime import datetime, timezone
from mqttwarn.context import RuntimeContext
from mqttwarn.model import Service
@dataclasses.dataclass
class FrigateEvent:
"""
Manage inbound event data received from Frigate.
"""
time: datetime
camera: str
label: str
current_zones: t.List[str]
entered_zones: t.List[str]
@staticmethod
def format_list(value: t.List[str]) -> t.List[str]:
"""
Format a list for human consumption.
"""
return [y.replace("_", " ") for y in value]
@property
def current_zones_str(self) -> str:
"""
Serialize list of `current_zones` to string.
"""
return ", ".join(self.format_list(self.current_zones or []))
@property
def entered_zones_str(self) -> str:
"""
Serialize list of `entered_zones` to string.
"""
return ", ".join(self.format_list(self.entered_zones or []))
def to_dict(self) -> t.Dict[str, str]:
"""
Return Python dictionary from attributes.
"""
return dataclasses.asdict(self)
@classmethod
def from_json(cls, payload: str) -> "FrigateEvent":
"""
Decode inbound Frigate event, in JSON format.
"""
# Decode JSON message.
after = json.loads(payload)["after"]
# Decode inbound Frigate event.
return cls(
time=datetime.fromtimestamp(after["frame_time"], tz=timezone.utc),
camera=after["camera"],
label=after["sub_label"] or after["label"],
current_zones=after["current_zones"],
entered_zones=after["entered_zones"],
)
ContainerType = t.Dict[str, t.Union[str, FrigateEvent]]
def frigate_events(topic: str, data: t.Dict[str, str], srv: Service) -> ContainerType:
"""
mqttwarn transformation function which computes options to be submitted to ntfy.
"""
# Decode inbound Frigate event.
event = FrigateEvent.from_json(data["payload"])
# Collect outbound ntfy option fields.
params: ContainerType = OrderedDict()
params.update(event.to_dict())
# Also add the event object as a whole, to let downstream templates leverage it.
params["event"] = event
return params
def frigate_events_filter(topic: str, payload: str, section: str, srv: Service) -> bool:
"""
mqttwarn filter function to only use `new` and important `update` Frigate events.
Additionally, validate more details within the event message,
specifically the `after` section. For example, skip false positives.
:return: True if message should be filtered, i.e. notification should be skipped.
"""
try:
message = json.loads(payload)
except json.JSONDecodeError as e:
srv.logging.warning(f"Can't parse Frigate event message: {e}")
return True
# ignore ending messages
message_type = message.get("type")
if message_type == "end":
srv.logging.warning(f"Frigate event skipped, ignoring Message type '{message_type}'")
return True
# payload must have 'after' key
elif "after" not in message:
srv.logging.warning("Frigate event skipped, 'after' missing from payload")
return True
after = message.get("after")
nonempty_fields = ["false_positive", "camera", "label", "current_zones", "entered_zones", "frame_time"]
for field in nonempty_fields:
# Validate field exists.
if field not in after:
srv.logging.warning(f"Frigate event skipped, missing field: {field}")
return True
value = after.get(field)
# We can ignore if `current_zones` is empty.
if field == "current_zones":
continue
# Check if it's a false positive.
if field == "false_positive":
if value is True:
srv.logging.warning("Frigate event skipped, it is a false positive")
return True
else:
continue
# All other keys should be present and have values.
if not value:
srv.logging.warning(f"Frigate event skipped, field is empty: {field}")
return True
# Ignore unimportant `update` events.
before = message.get("before")
if message_type == "update" and isinstance(before, dict):
if before.get("stationary") is True and after.get("stationary") is True:
srv.logging.warning("Frigate event skipped, object is stationary")
return True
elif after["current_zones"] == after["entered_zones"] or (
before["current_zones"] == after["current_zones"] and before["entered_zones"] == after["entered_zones"]
):
srv.logging.warning("Frigate event skipped, object stayed within same zone")
return True
# Evaluate optional skip rules.
context: RuntimeContext = srv.mwcore["context"]
frigate_skip_rules = context.config.getdict(section, "frigate_skip_rules")
for rule in frigate_skip_rules.values():
do_skip = True
for field_name, skip_values in rule.items():
actual_value = after[field_name]
if isinstance(actual_value, list):
do_skip = do_skip and all(value in skip_values for value in actual_value)
else:
do_skip = do_skip and actual_value in skip_values
if do_skip:
srv.logging.warning("Frigate event skipped, object did not enter zone of interest")
return True
return False
def frigate_snapshot_decode_topic(topic: str, data: t.Dict[str, str], srv: Service) -> t.Optional[t.Dict[str, str]]:
"""
Decode Frigate MQTT topic for image snapshots.
frigate/+/+/snapshot
See also:
- https://docs.frigate.video/integrations/mqtt/#frigatecamera_nameobject_namesnapshot
"""
topology = {}
if isinstance(topic, str):
try:
# TODO: Compile pattern only once, for efficiency reasons.
pattern = r"^frigate/(?P<camera_name>.+?)/(?P<object_name>.+?)/snapshot$"
p = re.compile(pattern)
m = p.match(topic)
if m:
topology = m.groupdict()
except:
pass
return topology
Tests#
The test_frigate.py file covers different code paths by running a few Frigate event message samples through the machinery, and inspecting their outcomes. You can invoke the test cases either as part of the complete test suite, or by running them from this directory:
pytest --no-cov -k frigate
pytest --no-cov test_frigate.py
Attributions#
Acknowledgements#
Sev for coming up with the idea of using mqttwarn to connect Frigate with ntfy
Content#
The copyright of data, particular images, and pictograms, are held by their respective owners, unless otherwise noted.
Example snapshot image#
Description: A picture of a Changthangi goat
Date: April 7, 2023
Source: Own work via Unsplash
Author: Jaromír Kalina
License: Unsplash License