Human detection with Ubiquiti’s UniFi Video and Python

Some months ago, I installed a couple of video cams from Ubiquiti’s UniFi series, and I think they are doing a pretty decent job compared to the price!

I played around with their motion detection settings, and soon realised that I got a shit load of notifications with leafs flying around.. The idea was to get a notification if people entered the camera area, not every other minut on a windy day.

Then I started to look around for alternatives, ofc. there’s an option to buy a camera with an integrated option to capture humans, but why not try to make the software my self? – The cameras are doing a great job, I just needed to write something able to find those humans! 🙂

I asked Google what to do, and found out that Python together with OpenCV could do the job. I have never really programmed anything from the bottom in Python, but hey, how hard could it be?

The goal of the program is the following:

  • Download a screenshot for each cam (looping around)
  • Scan videos for humans
    • If a human is detected
      • Send me an push notification with the image
      • Clean up the temp images
  • Integrate with Home Assistant, do disable notifications if I’m home (can be overwritten)

*NB* The Home Assistant part is not covered in this article *NB*

How does this OpenCV work, and how do we detect humans?

I found this guy Adrian at pyimagesearch, who had a tutorial with Python, OpenCV and something called YOLO (You Only Look Once, not to be compared with You Only Live Once) to detect objects on pictures and videos, and through I could use that.

Pyimagesearch’s YOLO tutorial can be found here
Pyimagesearch’s guide of how to install OpenCV with Python 3 on Ubuntu can be found here

So, I installed OpenCV and Python, and verified the setup with the tutorial-sample.

Access to the Ubiquiti NVR system

Next thing, how do I get the videos from the UniFi system?

The NVR does not save the files local in a structured way, when you access the website or the App, the system assembly the videos for you, so we have to download the videos from the system, to get the entire video.

The NVR have a REST-API, but it’s not very well documented, but after playing around I figured out how it worked.

First of all, create a new user, only to be used for API access, the user should only have “View”-access to the cams, but both “View” and “Edit” access to the recordings. And make sure that “Allow API Usage” is set to on. And copy the API Key – we are going to use that for all API requests.

With the API Key, it’s pretty straight forward, the API URLs are more or less the same, as the URL when browsing around the NVR.

I use Postman to test API calls, but you can use whatever you preferrer (for the Get requests the browser is just fine).

A couple of useful API Calls:

List all cameras:
https://<ubiquiti-nvr-ip>:<port>/api/2.0/camera?apiKey=<apikey>  (Get request)

List all users:
https://<ubiquiti-nvr-ip>:<port>/api/2.0/user?apiKey=<apikey>  (Get request)

List all recordings:
https://<ubiquiti-nvr-ip>:<port>/api/2.0/recording?apiKey=<apikey>  (Get request)

Download recording:
https://<ubiquiti-nvr-ip>:<port>/api/2.0/recording/<recordingid>/download?apiKey=<apikey>  (Get request)

Mark recording as locked:
https://<ubiquiti-nvr-ip>:<port>/api/2.0/recording/<recordingid>?apiKey=<apikey> | JSON encoded body: { “locked”: true } (Put request)

Python time

I’m going to create the following files:

  • getsnaps.py
    • Download screenshot from each camera
  • sendpush.py
    • Send a push message
  • checkha.py
    • Asks Home Assistant if home or not
    • Check if notification should be sent even if home is True
  • snap.py
    • Combine anything, and also do the detection 🙂

Lets start with getsnaps.py:

# USAGE
# python getsnaps.py

# DESCRIPTION
# This will connect to the specifiec NVR, grep a snapshot from all cams, download it local and return the file information location.

# import the necessary packages
import requests
import json
import time
import os
from datetime import datetime
import shutil

def getsnaps():
    #Basic Vars
    base = "https://<nvr-ip>:<port>/api/2.0/camera/"
    snaprl = "https://<nvr-ip>:<port>/api/2.0/snapshot/camera/"
    apiKey = "?apiKey=XXXXXX"

    downloadDir = "snaps/"

    #Disable SSL warnings for now
    requests.packages.urllib3.disable_warnings()

    #Get the url
    r = requests.get(base+apiKey, verify=False)

    #Parse it to json
    items = r.json()["data"]
    files = []

    for x in items:
        CamName = x["name"]
        CamID = x["_id"]

        url = snaprl+CamID+apiKey+"&force=true"
        d = datetime.now()
        curUnixtime = time.mktime(d.timetuple())
        curUnixtime = str(curUnixtime)[0:10]

        response = requests.get(url, stream=True, verify=False)
        with open(downloadDir+CamName+'_'+str(int(curUnixtime))+'.png', 'wb') as out_file:
            shutil.copyfileobj(response.raw, out_file)
        del response
        files.append(downloadDir+CamName+'_'+str(int(curUnixtime))+'.png')

    return files

Then sendpush.py: (I use Pushover to accomplish this)

# USAGE
# python sendpush.py

# DESCRIPTION
# Send push to devices
# This will connect to the specifiec NVR, and lock the listed videos

# import the necessary packages
import requests

base = "https://api.pushover.net/1/messages.json"
token = "" #Token
userKey = "" #User key

#Disable SSL warnings for now
def sendpush(img):
requests.packages.urllib3.disable_warnings()

values = {'token':token, 'user':userKey, 'message':'Human detected!'}
files = {'attachment': open(img,'rb')}

r = requests.post(base, data = values, files=files, verify=False)

Next is getting info from HA.
checkha.py:

# USAGE
# python checkha.py

# DESCRIPTION
# This will connect to the specifiec Home Assistant
# There are two functions, the first is to check if the user is home or not.
# The next is to check if an notificaion override isset, which ignores if the user is home.

# import the necessary packages
import requests
import json
import time
import os
from datetime import datetime

def checkhome():
    #Basic Vars

    base = "https://<HA-URL>:<port>"
    url = "/api/states/device_tracker.marc_iphone"
    apiKey = "" #HA API Key

    headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer '+apiKey
            }

    #Disable SSL warnings for now
    requests.packages.urllib3.disable_warnings()

    #Get the url
    r = requests.get(base+url, headers=headers, verify=False)

    #Parse it to json
    items = r.json()

    return items["state"]

def checkcam():
    #Basic Vars

    base = "https://<HA-URL>:<port>"
    url = "/api/states/input_boolean.notify_cam"
    apiKey = "" #HA API Key

    headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer '+apiKey
            }

    #Disable SSL warnings for now
    requests.packages.urllib3.disable_warnings()

    #Get the url
    r = requests.get(base+url, headers=headers, verify=False)

    #Parse it to json
    items = r.json()

    return items["state"]

And now snap.py: (Reference)

# USAGE
# python snap.py

import imutils
import numpy as np
import time
import cv2
import os
import getsnaps
import sendpush
import random
import checkha
import datetime
import logging

# Construct the variables
yoloDir = "yolo-coco"
setConfidence = float(0.5)
setThreshold = float(0.3)
detectFor = ["person", "dog", "car"]
imgName = ""
cooldown = {}

# Setup basic logging
logging.basicConfig(filename='snap.log',level=logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.WARNING)

# load the COCO class labels our YOLO model was trained on
labelsPath = os.path.sep.join([yoloDir, "coco.names"])
LABELS = open(labelsPath).read().strip().split("\n")

# initialize a list of colors to represent each possible class label
np.random.seed(42)
COLORS = np.random.randint(0, 255, size=(len(LABELS), 3),
	dtype="uint8")

# derive the paths to the YOLO weights and model configuration
weightsPath = os.path.sep.join([yoloDir, "yolov3.weights"])
configPath = os.path.sep.join([yoloDir, "yolov3.cfg"])

# load our YOLO object detector trained on COCO dataset (80 classes)
net = cv2.dnn.readNetFromDarknet(configPath, weightsPath)


while True:
    now = datetime.datetime.now()
    logging.info("Time for current run is: "+now.strftime("%Y-%m-%d %H:%M"))

    # Check if home, if then wait 5 minutes and check again (if checkcam is off)
    if checkha.checkhome() in "home" and checkha.checkcam() in "off":
        logging.info("Marc is home, skipping!")
        logging.info("Sleeping for 5 minutes!")
        time.sleep(300)
        continue
    logging.info("Marc is not home, scanning!")

    # Grap the snapshots
    snaps = getsnaps.getsnaps()
    count = 0

    # loop through the snaps
    for snap in snaps:
        camName = snap.split("/")[1].split("_")[0]
        curUnixtime = time.mktime(now.timetuple())
        curUnixtime = int(str(curUnixtime)[0:10])
        if not camName in cooldown:
            cooldown[camName] = 0
        cooltime = cooldown[camName]
        oneAgoUnixtime = int(curUnixtime) - 2 * 60

        # if a detection for the came within the last 2 minutes, skip the came to avoid spam
        if not cooltime < oneAgoUnixtime:
            logging.info("Under cooldown, skipping this cam")
            os.remove(snap)
            continue
        frame = cv2.imread(snap,1)

        (H, W) = frame.shape[:2]

        # determine only the *output* layer names that we need from YOLO
        ln = net.getLayerNames()
        ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]

        # construct a blob from the input image and then perform a forward
        # pass of the YOLO object detector, giving us our bounding boxes and
        # associated probabilities
        blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416),
                swapRB=True, crop=False)
        net.setInput(blob)
        start = time.time()
        layerOutputs = net.forward(ln)
        end = time.time()

        # show timing information on YOLOprint("[INFO] YOLO took {:.6f} seconds".format(end - start))

        # initialize our lists of detected bounding boxes, confidences, and
        # class IDs, respectively
        boxes = []
        confidences = []
        classIDs = []

        # loop over each of the layer outputs
        for output in layerOutputs:
            # loop over each of the detections
            for detection in output:
                # extract the class ID and confidence (i.e., probability) of
                # the current object detection
                scores = detection[5:]
                classID = np.argmax(scores)
                confidence = scores[classID]

                # filter out weak predictions by ensuring the detected
                # probability is greater than the minimum probability
                if confidence > setConfidence:
                    # scale the bounding box coordinates back relative to the
                    # size of the image, keeping in mind that YOLO actually
                    # returns the center (x, y)-coordinates of the bounding
                    # box followed by the boxes' width and height
                    box = detection[0:4] * np.array([W, H, W, H])
                    (centerX, centerY, width, height) = box.astype("int")

                    # use the center (x, y)-coordinates to derive the top and
                    # and left corner of the bounding box
                    x = int(centerX - (width / 2))
                    y = int(centerY - (height / 2))

                    # update our list of bounding box coordinates, confidences,
                    # and class IDs
                    boxes.append([x, y, int(width), int(height)])
                    confidences.append(float(confidence))
                    classIDs.append(classID)

        # apply non-maxima suppression to suppress weak, overlapping bounding
        # boxes
        idxs = cv2.dnn.NMSBoxes(boxes, confidences, setConfidence, setThreshold)

        # ensure at least one detection exists
        if len(idxs) > 0:

            # loop over the indexes we are keeping
            for i in idxs.flatten():
                # extract the bounding box coordinates
                (x, y) = (boxes[i][0], boxes[i][1])
                (w, h) = (boxes[i][2], boxes[i][3])

                getObject = LABELS[classIDs[i]]

                if getObject in detectFor:

                    # draw a bounding box rectangle and label on the image
                    color = [int(c) for c in COLORS[classIDs[i]]]
                    cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
                    text = "{}: {:.4f}".format(LABELS[classIDs[i]], confidences[i])
                    cv2.putText(frame, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX,
                            0.5, color, 2)

                    if getObject == "person":
                        if confidences[i] > 0.80:
                            # Send push
                            logging.warning("HOOOMAN DETECTED!")
                            imgName = "snaps/match"+str(random.randint(0,10000))+"-%d.jpg" % count
                            cv2.imwrite(imgName, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
                            count = count + 1

                            cooldown[camName] = curUnixtime
                            sendpush.sendpush(imgName)
                            os.remove(imgName)
                            imgName = ""

        os.remove(snap)

And that’s more or less it!

You need to create the following folder:

  • snaps (image folder)

And then, just setup the script as a service, create a new file /etc/systemd/system/snaps.service:

[Unit]
Description=My Snap Script Service
After=multi-user.target

[Service]
Type=idle
WorkingDirectory=/home/youruser/Python/
ExecStart=/home/youruser/.virtualenvs/cv/bin/python /home/youruser/Python/snap.py
Restart=always

[Install]
WantedBy=multi-user.target

Then reload the daemon, enable, start it and check the status:

sudo systemctl daemon-reload
sudo systemctl enable snaps.service
sudo systemctl start snaps.service
sudo systemctl status snaps.service

You’re done!

Example of the push message:

 

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.