Add perspective correction via ArUco markers

This commit is contained in:
Davide Depau 2023-09-17 17:35:07 +02:00
parent f721ac9ab5
commit 092cdd6de9
4 changed files with 208 additions and 39 deletions

46
aruco_realtime.py Normal file
View file

@ -0,0 +1,46 @@
import sys
import cv2
from cv2 import aruco
# Define the ArUco dictionary
aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250)
aruco_params = cv2.aruco.DetectorParameters()
# Specify the RTSP URL
rtsp_url = sys.argv[1]
# Capture the video stream from the RTSP URL
cap = cv2.VideoCapture(rtsp_url)
while True:
ret, frame = cap.read()
# Check if frame is not empty
if frame is None:
print("Empty frame, exiting...")
break
# Detect ArUco markers
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
corners, ids, rejectedImgPoints = aruco.detectMarkers(
gray, aruco_dict, parameters=aruco_params
)
# If some markers are found, draw them on the frame
if ids is not None:
frame = cv2.aruco.drawDetectedMarkers(frame, corners, ids)
# Scale frame in half
frame = cv2.resize(frame, (0, 0), fx=0.5, fy=0.5)
# Display the frame
cv2.imshow("Frame", frame)
# Wait for the 'q' key to exit the loop
if cv2.waitKey(1) & 0xFF == ord("q"):
break
# Release the capture object and close all OpenCV windows
cap.release()
cv2.destroyAllWindows()

187
bot.py
View file

@ -1,13 +1,18 @@
import asyncio import asyncio
import os import os
import time
import traceback import traceback
import warnings import warnings
from typing import cast, Optional from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import cast, List
import aiohttp import aiohttp
import cv2 import cv2
import numpy as np
import telegram import telegram
from aiohttp import BasicAuth from aiohttp import BasicAuth
from cv2 import aruco
from pytapo import Tapo from pytapo import Tapo
from telegram import Update, Message from telegram import Update, Message
from telegram.ext import Updater from telegram.ext import Updater
@ -32,6 +37,7 @@ class Bot:
self.me = None self.me = None
self.tapo = Tapo(self.camera_ip, self.camera_user, self.camera_password) self.tapo = Tapo(self.camera_ip, self.camera_user, self.camera_password)
self.executor = ThreadPoolExecutor(max_workers=len(os.sched_getaffinity(0)))
def _get_presets(self): def _get_presets(self):
presets = self.tapo.getPresets() presets = self.tapo.getPresets()
@ -56,14 +62,133 @@ class Bot:
) as resp: ) as resp:
return await resp.text() return await resp.text()
def _take_photo(self) -> Optional[bytes]: def _take_photo_blocking(
self,
adjust_perspective=True,
timeout: float = 5.0,
) -> List[cv2.typing.MatLike]:
# Prepare the camera
print("Disabling privacy mode and setting auto day/night mode...")
self.tapo.setPrivacyMode(False)
self.tapo.setDayNightMode("auto")
time.sleep(1)
# Take the color image
vcap = cv2.VideoCapture( vcap = cv2.VideoCapture(
f"rtsp://{self.camera_user}:{self.camera_password}@{self.camera_ip}:554/stream1" f"rtsp://{self.camera_user}:{self.camera_password}@{self.camera_ip}:554/stream1"
) )
ret, frame = vcap.read() print("Taking color image...")
ret, pretty_image = vcap.read()
while pretty_image is None:
ret, pretty_image = vcap.read()
self.tapo.setDayNightMode("on")
if not adjust_perspective:
return [pretty_image]
# Iterate until we find all 4 aruco markers or timeout
aruco_corners = {}
annotated_image = None
aruco_dict = aruco.getPredefinedDictionary(aruco.DICT_6X6_250)
aruco_params = aruco.DetectorParameters()
t0 = time.time()
print("Taking image with ArUco markers...")
while len(aruco_corners) < 4:
if time.time() - t0 > timeout:
print(
"Timeout waiting for ArUco markers, returning only original image"
)
return [pretty_image]
ret, annotated_image = vcap.read()
if not ret: if not ret:
return None continue
return frame
# Detect the markers
corners, ids, rejected = aruco.detectMarkers(
annotated_image, aruco_dict, parameters=aruco_params
)
for corner, i in zip(corners, ids):
aruco_corners[i[0]] = corner
assert annotated_image is not None
del vcap
print(
"Found all ArUco markers, enabled privacy mode and set auto day/night mode..."
)
self.tapo.setDayNightMode("auto")
self.tapo.setPrivacyMode(True)
corners = [aruco_corners[i] for i in range(1, 5)]
ids = np.array([[i] for i in range(1, 5)])
aruco.drawDetectedMarkers(annotated_image, corners, ids)
# Annotate the image with the detected markers and apply the perspective transform to the pretty image
# Get the outermost points of each marker
bl_marker = corners[0].squeeze()[3] # bottom left marker
tl_marker = corners[1].squeeze()[0] # top left marker
tr_marker = corners[2].squeeze()[1] # top right marker
bc_marker = corners[3].squeeze()[3] # bottom center marker
# Calculate the fourth point by computing the line through the bottom markers and intersecting with the vertical
# line through the top right marker
slope = (bc_marker[1] - bl_marker[1]) / (bc_marker[0] - bl_marker[0])
y_intersection = slope * (tr_marker[0] - bc_marker[0]) + bc_marker[1]
fourth_point = [tr_marker[0], y_intersection]
rectangle_points = np.array(
[tl_marker, bl_marker, fourth_point, tr_marker], dtype="float32"
)
# Expand the rectangle slightly
centroid = np.mean(rectangle_points, axis=0)
expanded_rectangle_points = (
rectangle_points + (rectangle_points - centroid) * 0.025
)
# Draw the expanded rectangle on the annotated image
cv2.polylines(
annotated_image,
[np.int32(expanded_rectangle_points)],
isClosed=True,
color=(0, 255, 0),
thickness=3,
)
# Define destination points for perspective transform, maintaining a 3:2 aspect ratio
width, height = 300 * 5, 200 * 5
dst_pts = np.array(
[[0, 0], [0, height], [width, height], [width, 0]], dtype="float32"
)
matrix = cv2.getPerspectiveTransform(expanded_rectangle_points, dst_pts)
warped = cv2.warpPerspective(pretty_image, matrix, (width, height))
return [warped, annotated_image]
async def _photo_command(self, chat_id: int, adjust_perspective: bool = True):
await self.bot.send_chat_action(chat_id=chat_id, action="upload_photo")
photos = await self.take_photo(adjust_perspective=adjust_perspective)
jpegs = [cv2.imencode(".jpg", photo)[1].tobytes() for photo in photos]
media = [
telegram.InputMediaPhoto(
media=telegram.InputFile(jpeg, filename=f"photo{i}.jpg", attach=True),
filename=f"photo{i}.jpg",
)
for i, jpeg in enumerate(jpegs)
]
await self.bot.send_media_group(
chat_id=chat_id,
media=media,
caption=str(datetime.now().strftime("%A, %B %d, %Y %H:%M:%S")),
)
async def parse_message(self, msg: Message): async def parse_message(self, msg: Message):
match msg.text: match msg.text:
@ -113,41 +238,27 @@ class Bot:
text=f"Light is {state}", text=f"Light is {state}",
) )
case "/photo": case "/photo":
await self.bot.send_chat_action( await self._photo_command(msg.chat_id, adjust_perspective=True)
chat_id=msg.chat_id, action="upload_photo" case "/photo_unprocessed":
) await self._photo_command(msg.chat_id, adjust_perspective=False)
light_state = await self._get_item_state() async def take_photo(
if light_state == "OFF": self, adjust_perspective=True, timeout=5.0
) -> List[cv2.typing.MatLike]:
item_state = await self._get_item_state()
if item_state == "OFF":
print("Turning light on") print("Turning light on")
await self._send_item_command("ON") await self._send_item_command("ON")
print("Disabling privacy mode") try:
self.tapo.setPrivacyMode(False) return await asyncio.get_event_loop().run_in_executor(
await asyncio.sleep(2) self.executor, self._take_photo_blocking, adjust_perspective, timeout
print("Taking photo") )
photo = self._take_photo() finally:
print("Enabling privacy mode") if item_state == "OFF":
self.tapo.setPrivacyMode(True)
if light_state == "OFF":
print("Turning light back off") print("Turning light back off")
await self._send_item_command("OFF") await self._send_item_command("OFF")
if photo is None:
await self.bot.send_message(
chat_id=msg.chat_id,
text=f"Error taking photo",
)
return
# Encode photo to jpeg
photo = cv2.imencode(".jpg", photo)[1].tobytes()
await self.bot.send_photo(
chat_id=msg.chat_id,
photo=telegram.InputFile(photo, filename="photo.jpg"),
)
async def run(self): async def run(self):
async with self.bot: async with self.bot:
self.me = await self.bot.get_me() self.me = await self.bot.get_me()
@ -155,6 +266,7 @@ class Bot:
updater = Updater(bot=self.bot, update_queue=asyncio.Queue()) updater = Updater(bot=self.bot, update_queue=asyncio.Queue())
async with updater: async with updater:
queue = await updater.start_polling(allowed_updates=[Update.MESSAGE]) queue = await updater.start_polling(allowed_updates=[Update.MESSAGE])
print("Bot is up and running")
while True: while True:
# noinspection PyBroadException # noinspection PyBroadException
@ -164,9 +276,18 @@ class Bot:
if not upd.message or upd.message.chat_id != self.chat_id: if not upd.message or upd.message.chat_id != self.chat_id:
print("Ignoring message") print("Ignoring message")
continue continue
# noinspection PyBroadException
try:
await self.parse_message(upd.message) await self.parse_message(upd.message)
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
exc = traceback.format_exc()
await self.bot.send_message(
chat_id=self.chat_id,
text=f"Error: {exc}",
)
except Exception:
traceback.print_exc()
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -4,3 +4,4 @@ reposition - Re-aim the camera
light_on - Turn on the light light_on - Turn on the light
light_off - Turn off the light light_off - Turn off the light
light_status - Check the status of the light light_status - Check the status of the light
photo_unprocessed - Take a photo of the grocery list board without processing

View file

@ -1,4 +1,5 @@
pytapo pytapo
python-telegram-bot python-telegram-bot
opencv-python-headless opencv-python-headless
opencv-contrib-python-headless
aiohttp aiohttp