# RasperryPi Kamera Überwachung mit Telegram Alarmierung

L1

# Problemstellung

Wenn es um das Thema Überwachungskamera geht ist die Frage des Vertrauens besonders wichtig. Nicht nur soll es zuverlässig funktionieren, sondern auch soll es niemanden möglich sein auf die Daten zuzugreifen. Diametral zum Lockdown möchte man als Besitzer jederzeit und von überall mal nachgucken können und im Falle einer Erkennung zeitnah alarmiert werden. In diesem Projekt wird ein Raspberry Pi mit Kamera ausgestattet, die open-source Software "motion" zur Bewegungserkennung verwendet und an eine Telegram Gruppe ein Alarm geschickt, welcher gleich ein Schanppschuß von der Situation enthält.

# Hardware

  • Raspberry Pi 4
  • Raspberry Pi Nachtsicht Kamera, Fisheye

# These

Die verwendetet Software "motion" kann im Falle eines Ereignisses automatisch eine Aufnahme starten. Zusätzlich gibt es die Möglichkeit einen Datenbankeintrag zu erstellen und dann entsprechend über ein cronjob auf diese Ereignisse zu reagieren. Das ist mir jedoch zu sehr 1980er und daher verwende ich die eingebaute Methode um je nach Ereignis, z.b. on_picture_save, on_movie_start, etc, ein Programm mit diversen Parametern zu starten, welches die Informationen als JSON String an meinen MQTT-Bus sendet. Von dort aus kann ich auf die Ergeignisse auf verschiedenste Arten reagieren.

# Experiment

# Motion - automatische Aufnahme

apt-get install motion

Die Konfiguration wird entsprechend angepasst und erstmal getestet, das eine Aufnahme erfolgt und die Video Einstellungen so sinnvoll sind.

/etc/motion/motion.conf

daemon on
process_id_file /var/run/motion/motion.pid
setup_mode off
logfile /var/log/motion/motion.log
log_level 6
log_type all
videodevice /dev/video0
v4l2_palette 17
input -1
norm 0
frequency 0
power_line_frequency -1
rotate 180
flip_axis none
width 1024
height 768
framerate 25
minimum_frame_time 0
netcam_keepalive off
netcam_tolerant_check off
rtsp_uses_tcp on
auto_brightness on
brightness 0
contrast 0
saturation 0
hue 0
roundrobin_frames 1
roundrobin_skip 1
switchfilter off
threshold 1500
threshold_tune on
noise_level 32
noise_tune on
despeckle_filter EedDl
smart_mask_speed 0
lightswitch 50
minimum_motion_frames 1
pre_capture 0
post_capture 0
event_gap 60
max_movie_time 0
emulate_motion off
output_pictures center
output_debug_pictures off
quality 75
picture_type jpeg
ffmpeg_output_movies on
ffmpeg_output_debug_movies off
ffmpeg_bps 400000
ffmpeg_variable_bitrate 0
ffmpeg_video_codec mp4
ffmpeg_duplicate_frames true
timelapse_interval 0
timelapse_mode daily
timelapse_fps 30
timelapse_codec mpeg4
use_extpipe off
snapshot_interval 0
locate_motion_mode on
locate_motion_style box
text_right %Y-%m-%d\n%T-%q
text_left CAMERA %t
text_changes on
text_event %Y%m%d%H%M%S
text_double on
target_dir /home/pi/cam
snapshot_filename %v-%Y%m%d%H%M%S-snapshot
picture_filename %v-%Y%m%d%H%M%S-%q
movie_filename %v-%Y%m%d%H%M%S
timelapse_filename %Y%m%d-timelapse
ipv6_enabled off
stream_port 8081
stream_quality 50
stream_motion off
stream_maxrate 1
stream_localhost off
stream_limit 0
stream_auth_method 0
webcontrol_port 8080
webcontrol_localhost on
webcontrol_html_output on
webcontrol_parms 0
track_type 0
track_auto off
track_iomojo_id 0
track_step_angle_x 10
track_step_angle_y 10
track_move_wait 10
track_speed 255
track_stepsize 40
quiet on

# Nachrichten an MQTT schicken

Hierzu bedienen wir uns der eingebauten script Unterstützung und mosquitto_pub für das Senden der JSON Strings. Die Konfiguration wird einfach an die /etc/motion/motion.conf angefügt und anschliessend mittels "service motion restart" aktiviert.

on_picture_save /usr/bin/mosquitto_pub -t "fishberry/cam/%t/picture_save" -m "{\"filename\": \"%f\"}"
on_motion_detected /usr/bin/mosquitto_pub -t "fishberry/cam/%t/motion_detected" -m "{\"pixel\": %D,\"noise\": %N, \"motionwidth\": %i, \"motionheight\": %J, \"xcoord\": %K, \"ycoord\": %L }"
on_movie_start /usr/bin/mosquitto_pub -t "fishberry/cam/%t/movie_start" -m "{\"filename\": \"%f\"}"
on_movie_end /usr/bin/mosquitto_pub -t "fishberry/cam/%t/movie_end" -m "{\"filename\": \"%f\"}"
on_camera_lost /usr/bin/mosquitto_pub -t "fishberry/cam/%t/camera_lost" -m "1"
on_camera_found /usr/bin/mosquitto_pub -t "fishberry/cam/%t/camera_found" -m "1"

An dieser Stelle sollte nun im Falle einer Bewegungserkennung auf dem MQTT Bus eine Nachricht wie diese zu sehen sein.

mosquitto_sub -v -t 'fishberry/#'
fishberry/cam/0/picture_save {"filename": "/home/pi/cam/01-20201114211113-04.jpg"}

# Telegram ChatBot / Gruppe erstellen

Um Nachrichten an das Telegram Netzwerk zu senden, muss man sich authentifizieren. Um das spätere Testen zu vereinfacher, empfehle ich die Telegram App zusätzlich zum Handy auch auf dem Computer zu installieren, da man so einfach Daten kopieren kann.

Jetzt mit der Telegram App (Computer oder Handy), den BotFather öffnen https://telegram.me/BotFather (opens new window).

  • /mybots: Wenn man schon Bots eingereichtet hat, kann man hiermit den API Token auslesen
  • /newbot: hiermit legt man einen neuen an

Jetzt haben wir den notwendigen API Token um Nachrichten verschicken zu können. Diesen API Token muss man geheim halten, da sonst jemand Drittes unter unserem Namen Nachrichten verschicken könnte.

Als nächstes legt man eine Gruppe an und lädt den angelegten Bot zu der Gruppe ein.

Um nun eine Nachricht an diese Gruppe zu verschicken, müssen wir noch die ID dieser Gruppe herausfinden.

https://api.telegram.org/bot<YourBOTToken>/getUpdates
{"ok":true,"result":[]}

Da bisher noch keine Nachrichten ausgetauscht worden sind, ist die Liste für diesen Bot leer. Wenn man nun in der Telegram Gruppe eine "test" Nachricht schickt, sieht der Ergebnis so aus

curl https://api.telegram.org/bot<YourBOTToken>/getUpdates| json_pp 
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   296  100   296    0     0   2596      0 --:--:-- --:--:-- --:--:--  2573
{
   "result" : [
      {
         "message" : {
            "date" : 1605467234,
            "text" : "test",
            "from" : {
               "last_name" : "Curth",
               "language_code" : "de",
               "id" : 123455667xxx,
               "first_name" : "Sascha",
               "is_bot" : false
            },
            "chat" : {
               "last_name" : "Curth",
               "first_name" : "Sascha",
               "id" : 12354656xxx,     <----- Das ist die Chat ID
               "type" : "private"
            },
            "message_id" : 2218
         },
         "update_id" : 100033423
      }
   ],
   "ok" : true
}

Diese chat ID "12354656xxx" brauchen wir neben dem API Token im nächsten Schritt.

# Python Programm zur Alarmierung

Nun schreiben wir ein kleines python script, welches am MQTT auf eine "picture_save" Nachricht lauert und das Schnappschussbild an die Telegram Gruppe sendet.

#!/usr/bin/python3

import paho.mqtt.client as mqtt
import sys
import json
import telepot

def on_connect(client, userdata, flags, rc):  # The callback for when the client connects to the broker
    print("Connected with result code {0}".format(str(rc)))  # Print result of connection attempt
    client.subscribe("fishberry/cam/0/picture_save")

def on_message(client, userdata, msg):  # The callback for when a PUBLISH message is received from the server.
    print("Message received-> " + msg.topic + " " + str(msg.payload))  # Print a received msg
    json_obj=json.loads(msg.payload)
    if "filename" in json_obj:
        result=json_obj["filename"]
        bot.sendPhoto(bot_chatID, photo=open(result, 'rb'), caption=result)
    else:
        print("No filename, skipping")
 
def main(argv=None):
    global bot_token
    global bot_chatID
    global bot

    bot_token = "12345678:ABCDEFGHT...."
    bot_chatID = "987654321"
    bot = telepot.Bot(bot_token)

    global client
    client = mqtt.Client("picture_save_listener")  # Create instance of client with client ID “digi_mqtt_test”
    client.on_connect = on_connect  # Define callback function for successful connection
    client.on_message = on_message  # Define callback function for receipt of a message
    client.connect('127.0.0.1', 1883)
    client.loop_forever()  # Start networking daemon

if __name__ == "__main__":
    main()

Um nicht immer vor der Kamera rumhampeln zu müssen, kann man von nun an mit einem beliebigen Bild testen und eine MQTT Nachricht schicken.

mosquitto_pub -t 'fishberry/cam/0/picture_save' -m '{"filename": "/home/pi/cam/01-20201114211113-04.jpg"}'

# Telegram Bot Rückkanal / 2 Wege Kommunikation

Was ist wenn ich via Telegram App gerne einen aktuellen Schnappschuß anfordern möchte? Dazu muss das python Skript etwas angepasst werden.

...
from telepot.loop import MessageLoop
import requests
...
def handle(msg):
    content_type, chat_type, chat_id = telepot.glance(msg)
    print(content_type, chat_type, chat_id)

    if content_type == 'text':
        if msg['text'] == '/cam_snap':
            r =requests.get('http://127.0.0.1:8080/0/action/snapshot')
        elif msg['text'] == '/cam_pause':
            r =requests.get('http://127.0.0.1:8080/0/detection/pause')
        elif msg['text'] == '/cam_start':
            r =requests.get('http://127.0.0.1:8080/0/detection/start')

def main(argv=None):
    global bot_token
    global bot_chatID
    global bot

    bot_token = "12345678:ABCDEFGHT...."
    bot_chatID = "987654321"
    bot = telepot.Bot(bot_token)

    MessageLoop(bot, handle).run_as_thread()
...

Mit der Programmlogik kann man nun z.b. "/cam_snap" in den Chat schreiben, was dazu führt das die besagte URL vom python Prozess aufgerufen wird und motion einen Schnappschuss erzeugt. Wie bisher wird diese Aktion dann mittels mosquitto_pub an den MQTT gemeldet und das Bild entsprechend gesendet.

# Kamera Status abfragen

Die Bewegungserkennung zu pausieren macht durchaus manchmal Sinn, aber man muss auch den Status abfragen können. Dazu wird einfach der Befehlssatz erweitert.

...
        elif msg['text'] == '/cam_status':
            r =requests.get('http://127.0.0.1:8080/0/detection/status')
            bot.sendMessage(chat_id, r.text)
...

Die Ausgabe erfolgt standardmäßig mit HTML tags und sieht wiefolgt aus

<!DOCTYPE html>
<html>
<head><title>Motion 4.1.1</title></head>
<meta name="viewport" content="width=device-width, initial-scale=1.0, user-scalable=yes">
<body>
<a href=/0/detection>&lt;&ndash; back</a><br><br><b>Camera 0</b> Detection status ACTIVE
</body>
</html>

Um das ganze für den Anwendungsfall besser leserlich zu gestalten, kann man die Ausgabe auf das raw Format umstellen.

/etc/motion/motion.conf

...
webcontrol_html_output off
...
Camera 0 Detection status ACTIVE

# Telgram CustomKeyboard

Statt die Kommandos einzutippen, wie zb. /cam_snap, wäre es doch noch besser wenn es einen Button gäbe mit dieser Funktion.

Dazu verändern wir die on_chat_message so, dass sobald irgendwas angefragt wird eine Antwort geschickt wird die nur die unterstützten Funktionen enthält. Wenn man auf einen solchen Button klickt, wird eine "callback_query" geschickt.

def on_chat_message(msg):
    content_type, chat_type, chat_id = telepot.glance(msg)
    print(content_type, chat_type, chat_id)

    helptext = "Verfügbare Kommandos"
    
    keyboard = InlineKeyboardMarkup(inline_keyboard=[
                   [ InlineKeyboardButton(text='📸 Cam snapshot', callback_data='cam_snap'),
                     InlineKeyboardButton(text='❓ Cam status', callback_data='cam_status')
                   ],
                   [ InlineKeyboardButton(text='📼 Cam an', callback_data='cam_start'),
                     InlineKeyboardButton(text='📴 Cam aus', callback_data='cam_pause')
                   ],
               ])

    bot.sendMessage(chat_id, helptext, reply_markup=keyboard)

Die main() Funktion muss daher nun zwischen normalem Chat und der Callback_query unterscheiden.

...
 MessageLoop(bot, {'chat': on_chat_message,
                   'callback_query': on_callback_query}).run_as_thread()
...

Das Mapping könnte dann so aussehen:

def on_callback_query(msg):
    global global_temperature
    global global_wass_temperature

    antwort='Irgendwas ist schief gelaufen.'
    query_id, from_id, query_data = telepot.glance(msg, flavor='callback_query')
    print('Callback Query:', query_id, from_id, query_data)

    if query_data == 'cam_snap':
        r =requests.get('http://127.0.0.1:8080/0/action/snapshot')
        antwort=r.text
    elif query_data == 'cam_pause':
        r =requests.get('http://127.0.0.1:8080/0/detection/pause')
        antwort=r.text
    elif query_data == 'cam_start':
        r =requests.get('http://127.0.0.1:8080/0/detection/start')
        antwort=r.text
    elif query_data == 'cam_status':
        r =requests.get('http://127.0.0.1:8080/0/detection/status')
        antwort=r.text

    bot.answerCallbackQuery(query_id, text=antwort, show_alert=0)

# Das Ergebnis

Nachdem die Kamera ordentlich ausgerichtet war und die Schwellwerte eingestellt waren, bekomme ich nun eine Momentaufnahme als Nachricht via Telegram. In dieser Telegram Gruppe sind dann auch die weiteren Familienmitglieder und alle entsprechend informiert. Alles in Allem, kein riesen Aufwand und ein Ergebnis das sich im wahrsten Sinne des Wortes sehen lassen kann. Insbesondere die Telegram Integration und die vereinfachte Benutzung mittels der Buttons bereitet Freude ohne die Sicherheit zu gefährden.

# Wie gehts weiter

Da ich die "on_motion_detected" Ereignisse ebenfalls an den MQTT Bus schicken, könnte ich mit Grafana eine Art Aktivitäten Protokoll erstellen und so zeitlich mit anderen Metriken korrelieren. Das Event sieht so in etwa aus:

fishberry/cam/0/motion_detected {"pixel": 15267,"noise": 13, "motionwidth": 60, "motionheight": 404, "xcoord": 124, "ycoord": 538 }

Natürlich müssen API Token und ChatID aus dem Programm Code entfernt und noch sicher ein paar andere Verschönerungen durchgeführt werden.

Ich werde das Script entsprechend erweitern, wie immer unter https://www.github.com/scurth (opens new window) als open-source veröffentlichen.

Last Updated: 31.1.2021, 14:53:18