Human detection with Ubiquiti’s UniFi Video and Python
Some months ago, I installed a couple of video cams from Ubiquiti’s UniFi series, and I think they are doing a pretty decent job compared to the price!
I played around with their motion detection settings, and soon realised that I got a shit load of notifications with leafs flying around.. The idea was to get a notification if people entered the camera area, not every other minut on a windy day.
Then I started to look around for alternatives, ofc. there’s an option to buy a camera with an integrated option to capture humans, but why not try to make the software my self? – The cameras are doing a great job, I just needed to write something able to find those humans! 🙂
I asked Google what to do, and found out that Python together with OpenCV could do the job. I have never really programmed anything from the bottom in Python, but hey, how hard could it be?
The goal of the program is the following:
- Download a screenshot for each cam (looping around)
- Scan videos for humans
- If a human is detected
- Send me an push notification with the image
- Clean up the temp images
- If a human is detected
- Integrate with Home Assistant, do disable notifications if I’m home (can be overwritten)
*NB* The Home Assistant part is not covered in this article *NB*
How does this OpenCV work, and how do we detect humans?
I found this guy Adrian at pyimagesearch, who had a tutorial with Python, OpenCV and something called YOLO (You Only Look Once, not to be compared with You Only Live Once) to detect objects on pictures and videos, and through I could use that.
Pyimagesearch’s YOLO tutorial can be found here
Pyimagesearch’s guide of how to install OpenCV with Python 3 on Ubuntu can be found here
So, I installed OpenCV and Python, and verified the setup with the tutorial-sample.
Access to the Ubiquiti NVR system
Next thing, how do I get the videos from the UniFi system?
The NVR does not save the files local in a structured way, when you access the website or the App, the system assembly the videos for you, so we have to download the videos from the system, to get the entire video.
The NVR have a REST-API, but it’s not very well documented, but after playing around I figured out how it worked.
First of all, create a new user, only to be used for API access, the user should only have “View”-access to the cams, but both “View” and “Edit” access to the recordings. And make sure that “Allow API Usage” is set to on. And copy the API Key – we are going to use that for all API requests.
With the API Key, it’s pretty straight forward, the API URLs are more or less the same, as the URL when browsing around the NVR.
I use Postman to test API calls, but you can use whatever you preferrer (for the Get requests the browser is just fine).
A couple of useful API Calls:
List all cameras:
https://<ubiquiti-nvr-ip>:<port>/api/2.0/camera?apiKey=<apikey> (Get request)
List all users:
https://<ubiquiti-nvr-ip>:<port>/api/2.0/user?apiKey=<apikey> (Get request)
List all recordings:
https://<ubiquiti-nvr-ip>:<port>/api/2.0/recording?apiKey=<apikey> (Get request)
Download recording:
https://<ubiquiti-nvr-ip>:<port>/api/2.0/recording/<recordingid>/download?apiKey=<apikey> (Get request)
Mark recording as locked:
https://<ubiquiti-nvr-ip>:<port>/api/2.0/recording/<recordingid>?apiKey=<apikey> | JSON encoded body: { “locked”: true } (Put request)
Python time
I’m going to create the following files:
- getsnaps.py
- Download screenshot from each camera
- sendpush.py
- Send a push message
- checkha.py
- Asks Home Assistant if home or not
- Check if notification should be sent even if home is True
- snap.py
- Combine anything, and also do the detection 🙂
Lets start with getsnaps.py:
# USAGE # python getsnaps.py # DESCRIPTION # This will connect to the specifiec NVR, grep a snapshot from all cams, download it local and return the file information location. # import the necessary packages import requests import json import time import os from datetime import datetime import shutil def getsnaps(): #Basic Vars base = "https://<nvr-ip>:<port>/api/2.0/camera/" snaprl = "https://<nvr-ip>:<port>/api/2.0/snapshot/camera/" apiKey = "?apiKey=XXXXXX" downloadDir = "snaps/" #Disable SSL warnings for now requests.packages.urllib3.disable_warnings() #Get the url r = requests.get(base+apiKey, verify=False) #Parse it to json items = r.json()["data"] files = [] for x in items: CamName = x["name"] CamID = x["_id"] url = snaprl+CamID+apiKey+"&force=true" d = datetime.now() curUnixtime = time.mktime(d.timetuple()) curUnixtime = str(curUnixtime)[0:10] response = requests.get(url, stream=True, verify=False) with open(downloadDir+CamName+'_'+str(int(curUnixtime))+'.png', 'wb') as out_file: shutil.copyfileobj(response.raw, out_file) del response files.append(downloadDir+CamName+'_'+str(int(curUnixtime))+'.png') return files
Then sendpush.py: (I use Pushover to accomplish this)
# USAGE # python sendpush.py # DESCRIPTION # Send push to devices # This will connect to the specifiec NVR, and lock the listed videos # import the necessary packages import requests base = "https://api.pushover.net/1/messages.json" token = "" #Token userKey = "" #User key #Disable SSL warnings for now def sendpush(img): requests.packages.urllib3.disable_warnings() values = {'token':token, 'user':userKey, 'message':'Human detected!'} files = {'attachment': open(img,'rb')} r = requests.post(base, data = values, files=files, verify=False)
Next is getting info from HA.
checkha.py:
# USAGE # python checkha.py # DESCRIPTION # This will connect to the specifiec Home Assistant # There are two functions, the first is to check if the user is home or not. # The next is to check if an notificaion override isset, which ignores if the user is home. # import the necessary packages import requests import json import time import os from datetime import datetime def checkhome(): #Basic Vars base = "https://<HA-URL>:<port>" url = "/api/states/device_tracker.marc_iphone" apiKey = "" #HA API Key headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer '+apiKey } #Disable SSL warnings for now requests.packages.urllib3.disable_warnings() #Get the url r = requests.get(base+url, headers=headers, verify=False) #Parse it to json items = r.json() return items["state"] def checkcam(): #Basic Vars base = "https://<HA-URL>:<port>" url = "/api/states/input_boolean.notify_cam" apiKey = "" #HA API Key headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer '+apiKey } #Disable SSL warnings for now requests.packages.urllib3.disable_warnings() #Get the url r = requests.get(base+url, headers=headers, verify=False) #Parse it to json items = r.json() return items["state"]
And now snap.py: (Reference)
# USAGE # python snap.py import imutils import numpy as np import time import cv2 import os import getsnaps import sendpush import random import checkha import datetime import logging # Construct the variables yoloDir = "yolo-coco" setConfidence = float(0.5) setThreshold = float(0.3) detectFor = ["person", "dog", "car"] imgName = "" cooldown = {} # Setup basic logging logging.basicConfig(filename='snap.log',level=logging.DEBUG) logging.getLogger("urllib3").setLevel(logging.WARNING) # load the COCO class labels our YOLO model was trained on labelsPath = os.path.sep.join([yoloDir, "coco.names"]) LABELS = open(labelsPath).read().strip().split("\n") # initialize a list of colors to represent each possible class label np.random.seed(42) COLORS = np.random.randint(0, 255, size=(len(LABELS), 3), dtype="uint8") # derive the paths to the YOLO weights and model configuration weightsPath = os.path.sep.join([yoloDir, "yolov3.weights"]) configPath = os.path.sep.join([yoloDir, "yolov3.cfg"]) # load our YOLO object detector trained on COCO dataset (80 classes) net = cv2.dnn.readNetFromDarknet(configPath, weightsPath) while True: now = datetime.datetime.now() logging.info("Time for current run is: "+now.strftime("%Y-%m-%d %H:%M")) # Check if home, if then wait 5 minutes and check again (if checkcam is off) if checkha.checkhome() in "home" and checkha.checkcam() in "off": logging.info("Marc is home, skipping!") logging.info("Sleeping for 5 minutes!") time.sleep(300) continue logging.info("Marc is not home, scanning!") # Grap the snapshots snaps = getsnaps.getsnaps() count = 0 # loop through the snaps for snap in snaps: camName = snap.split("/")[1].split("_")[0] curUnixtime = time.mktime(now.timetuple()) curUnixtime = int(str(curUnixtime)[0:10]) if not camName in cooldown: cooldown[camName] = 0 cooltime = cooldown[camName] oneAgoUnixtime = int(curUnixtime) - 2 * 60 # if a detection for the came within the last 2 minutes, skip the came to avoid spam if not cooltime < oneAgoUnixtime: logging.info("Under cooldown, skipping this cam") os.remove(snap) continue frame = cv2.imread(snap,1) (H, W) = frame.shape[:2] # determine only the *output* layer names that we need from YOLO ln = net.getLayerNames() ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()] # construct a blob from the input image and then perform a forward # pass of the YOLO object detector, giving us our bounding boxes and # associated probabilities blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False) net.setInput(blob) start = time.time() layerOutputs = net.forward(ln) end = time.time() # show timing information on YOLOprint("[INFO] YOLO took {:.6f} seconds".format(end - start)) # initialize our lists of detected bounding boxes, confidences, and # class IDs, respectively boxes = [] confidences = [] classIDs = [] # loop over each of the layer outputs for output in layerOutputs: # loop over each of the detections for detection in output: # extract the class ID and confidence (i.e., probability) of # the current object detection scores = detection[5:] classID = np.argmax(scores) confidence = scores[classID] # filter out weak predictions by ensuring the detected # probability is greater than the minimum probability if confidence > setConfidence: # scale the bounding box coordinates back relative to the # size of the image, keeping in mind that YOLO actually # returns the center (x, y)-coordinates of the bounding # box followed by the boxes' width and height box = detection[0:4] * np.array([W, H, W, H]) (centerX, centerY, width, height) = box.astype("int") # use the center (x, y)-coordinates to derive the top and # and left corner of the bounding box x = int(centerX - (width / 2)) y = int(centerY - (height / 2)) # update our list of bounding box coordinates, confidences, # and class IDs boxes.append([x, y, int(width), int(height)]) confidences.append(float(confidence)) classIDs.append(classID) # apply non-maxima suppression to suppress weak, overlapping bounding # boxes idxs = cv2.dnn.NMSBoxes(boxes, confidences, setConfidence, setThreshold) # ensure at least one detection exists if len(idxs) > 0: # loop over the indexes we are keeping for i in idxs.flatten(): # extract the bounding box coordinates (x, y) = (boxes[i][0], boxes[i][1]) (w, h) = (boxes[i][2], boxes[i][3]) getObject = LABELS[classIDs[i]] if getObject in detectFor: # draw a bounding box rectangle and label on the image color = [int(c) for c in COLORS[classIDs[i]]] cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2) text = "{}: {:.4f}".format(LABELS[classIDs[i]], confidences[i]) cv2.putText(frame, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) if getObject == "person": if confidences[i] > 0.80: # Send push logging.warning("HOOOMAN DETECTED!") imgName = "snaps/match"+str(random.randint(0,10000))+"-%d.jpg" % count cv2.imwrite(imgName, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) count = count + 1 cooldown[camName] = curUnixtime sendpush.sendpush(imgName) os.remove(imgName) imgName = "" os.remove(snap)
And that’s more or less it!
You need to create the following folder:
- snaps (image folder)
And then, just setup the script as a service, create a new file /etc/systemd/system/snaps.service:
[Unit] Description=My Snap Script Service After=multi-user.target [Service] Type=idle WorkingDirectory=/home/youruser/Python/ ExecStart=/home/youruser/.virtualenvs/cv/bin/python /home/youruser/Python/snap.py Restart=always [Install] WantedBy=multi-user.target
Then reload the daemon, enable, start it and check the status:
sudo systemctl daemon-reload sudo systemctl enable snaps.service sudo systemctl start snaps.service sudo systemctl status snaps.service
You’re done!
Example of the push message: