408 lines
15 KiB
Python
408 lines
15 KiB
Python
import asyncio
|
|
import os
|
|
import time
|
|
import traceback
|
|
import warnings
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime
|
|
from typing import cast, List
|
|
|
|
import aiohttp
|
|
import cv2
|
|
import numpy as np
|
|
import telegram
|
|
from aiohttp import BasicAuth
|
|
from cv2 import aruco
|
|
from pytapo import Tapo
|
|
from telegram import Update, Message
|
|
from telegram.ext import Updater
|
|
from urllib3.exceptions import InsecureRequestWarning
|
|
|
|
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
|
|
|
|
|
|
class Bot:
|
|
def __init__(self):
|
|
self.token = os.environ["BOT_TOKEN"]
|
|
self.camera_ip = os.environ["CAMERA_IP"]
|
|
self.camera_user = os.environ["CAMERA_USER"]
|
|
self.camera_password = os.environ["CAMERA_PASSWORD"]
|
|
self.chat_ids = list(map(int, os.environ["CHAT_ID"].split(",")))
|
|
self.profile_name = os.environ.get("CAMERA_PROFILE_NAME", "board")
|
|
self.openhab_url = os.environ["OPENHAB_URL"]
|
|
self.openhab_token = os.environ["OPENHAB_TOKEN"]
|
|
self.openhab_item = os.environ["OPENHAB_ITEM"]
|
|
|
|
self.bot = telegram.Bot(token=self.token)
|
|
self.me = None
|
|
|
|
self.tapo = Tapo(self.camera_ip, self.camera_user, self.camera_password)
|
|
self.executor = ThreadPoolExecutor(max_workers=len(os.sched_getaffinity(0)))
|
|
|
|
self.last_aruco_corners = [None] * 4
|
|
|
|
def _get_presets(self):
|
|
presets = self.tapo.getPresets()
|
|
return {v: k for k, v in presets.items()}
|
|
|
|
async def _get_item_state(self):
|
|
url = f"{self.openhab_url}/rest/items/{self.openhab_item}/state"
|
|
# use aiohttp instead of requests to avoid blocking
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, auth=BasicAuth(self.openhab_token, "")) as resp:
|
|
return await resp.text()
|
|
|
|
async def _send_item_command(self, command):
|
|
print(f"Sending command {command}")
|
|
url = f"{self.openhab_url}/rest/items/{self.openhab_item}"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
url,
|
|
auth=BasicAuth(self.openhab_token, ""),
|
|
data=command,
|
|
headers={"Content-Type": "text/plain", "Accept": "*/*"},
|
|
) as resp:
|
|
return await resp.text()
|
|
|
|
def _take_photo_blocking(
|
|
self,
|
|
adjust_perspective=True,
|
|
timeout: float = 10.0,
|
|
chat_action_fn=None,
|
|
) -> List[cv2.typing.MatLike]:
|
|
privacy_mode = self.tapo.getPrivacyMode()
|
|
|
|
# Prepare the camera
|
|
print("Disabling privacy mode and setting auto day/night mode...")
|
|
self.tapo.setPrivacyMode(False)
|
|
self.tapo.setDayNightMode("auto")
|
|
time.sleep(1)
|
|
|
|
# Take the color image
|
|
vcap = cv2.VideoCapture(
|
|
f"rtsp://{self.camera_user}:{self.camera_password}@{self.camera_ip}:554/stream1"
|
|
)
|
|
if chat_action_fn:
|
|
chat_action_fn()
|
|
|
|
print("Taking color image...")
|
|
ret, pretty_image = vcap.read()
|
|
while pretty_image is None:
|
|
ret, pretty_image = vcap.read()
|
|
|
|
if not adjust_perspective:
|
|
self.tapo.setPrivacyMode(privacy_mode)
|
|
return [pretty_image]
|
|
|
|
self.tapo.setDayNightMode("on")
|
|
|
|
# Iterate until we find all 4 aruco markers or timeout
|
|
aruco_corners = [None] * 4
|
|
|
|
def found_all(corners):
|
|
return all(c is not None for c in corners)
|
|
|
|
annotated_image = None
|
|
aruco_dict = aruco.getPredefinedDictionary(aruco.DICT_4X4_50)
|
|
aruco_params = aruco.DetectorParameters()
|
|
|
|
if chat_action_fn:
|
|
chat_action_fn()
|
|
|
|
t0 = time.time()
|
|
last_chat_action = 0
|
|
print("Taking image with ArUco markers...")
|
|
while not found_all(aruco_corners):
|
|
delta = time.time() - t0
|
|
if delta > timeout:
|
|
a = self.last_aruco_corners.copy()
|
|
for i in range(4):
|
|
if aruco_corners[i] is not None:
|
|
a[i] = aruco_corners[i]
|
|
aruco_corners = a
|
|
|
|
if found_all(aruco_corners):
|
|
print("Timeout waiting for ArUco markers, using cached corners")
|
|
break
|
|
|
|
print(
|
|
"Timeout waiting for ArUco markers, returning only original image"
|
|
)
|
|
|
|
ids, corners = zip(*filter(lambda x: x[1] is not None, enumerate(aruco_corners)))
|
|
aruco.drawDetectedMarkers(pretty_image, corners, ids)
|
|
|
|
self.tapo.setPrivacyMode(privacy_mode)
|
|
self.tapo.setDayNightMode("auto")
|
|
return [pretty_image]
|
|
|
|
if delta - last_chat_action > 3:
|
|
last_chat_action = delta
|
|
if chat_action_fn:
|
|
chat_action_fn()
|
|
|
|
ret, annotated_image = vcap.read()
|
|
if not ret:
|
|
continue
|
|
|
|
# Detect the markers
|
|
corners, ids, rejected = aruco.detectMarkers(
|
|
annotated_image, aruco_dict, parameters=aruco_params
|
|
)
|
|
if corners is not None and ids is not None:
|
|
for corner, i in zip(corners, ids):
|
|
index = i[0]
|
|
if not (0 <= index < 4):
|
|
warnings.warn(f"Invalid ArUco marker ID: {index}")
|
|
continue
|
|
aruco_corners[index] = corner
|
|
assert annotated_image is not None
|
|
|
|
del vcap
|
|
|
|
print(
|
|
"Found all ArUco markers, enabled privacy mode and set auto day/night mode..."
|
|
)
|
|
self.tapo.setDayNightMode("auto")
|
|
self.tapo.setPrivacyMode(privacy_mode)
|
|
|
|
aruco.drawDetectedMarkers(annotated_image, aruco_corners, np.array(range(4)))
|
|
|
|
# Annotate the image with the detected markers and apply the perspective transform to the pretty image
|
|
|
|
# Get the outermost points of each marker
|
|
tl_marker = aruco_corners[0].squeeze()[0] # top left marker
|
|
tr_marker = aruco_corners[1].squeeze()[1] # top right marker
|
|
bl_marker = aruco_corners[2].squeeze()[3] # bottom left marker
|
|
bc_marker = aruco_corners[3].squeeze()[2] # bottom center marker
|
|
|
|
# Calculate the fourth point by computing the line through the bottom markers and intersecting with the vertical
|
|
# line through the top right marker
|
|
slope = (bc_marker[1] - bl_marker[1]) / (bc_marker[0] - bl_marker[0])
|
|
y_intersection = slope * (tr_marker[0] - bc_marker[0]) + bc_marker[1]
|
|
fourth_point = [tr_marker[0], y_intersection]
|
|
|
|
rectangle_points = np.array(
|
|
[tl_marker, bl_marker, fourth_point, tr_marker], dtype="float32"
|
|
)
|
|
|
|
# Expand the rectangle slightly
|
|
centroid = np.mean(rectangle_points, axis=0)
|
|
expanded_rectangle_points = (
|
|
rectangle_points + (rectangle_points - centroid) * 0.025
|
|
)
|
|
|
|
# Draw the expanded rectangle on the annotated image
|
|
cv2.polylines(
|
|
annotated_image,
|
|
[np.int32(expanded_rectangle_points)],
|
|
isClosed=True,
|
|
color=(0, 255, 0),
|
|
thickness=3,
|
|
)
|
|
|
|
# Define destination points for perspective transform, maintaining a 3:2 aspect ratio
|
|
width, height = 300 * 5, 200 * 5
|
|
dst_pts = np.array(
|
|
[[0, 0], [0, height], [width, height], [width, 0]], dtype="float32"
|
|
)
|
|
|
|
matrix = cv2.getPerspectiveTransform(expanded_rectangle_points, dst_pts)
|
|
warped = cv2.warpPerspective(pretty_image, matrix, (width, height))
|
|
|
|
self.last_aruco_corners = aruco_corners
|
|
|
|
return [warped, annotated_image]
|
|
|
|
async def _photo_command(self, chat_id: int, adjust_perspective: bool = True):
|
|
async def chat_action_fn():
|
|
await self.bot.send_chat_action(chat_id=chat_id, action="upload_photo")
|
|
|
|
await chat_action_fn()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
photos = await self.take_photo(
|
|
adjust_perspective=adjust_perspective,
|
|
chat_action_fn=lambda: loop.create_task(chat_action_fn()),
|
|
)
|
|
jpegs = [cv2.imencode(".jpg", photo)[1].tobytes() for photo in photos]
|
|
|
|
media = [
|
|
telegram.InputMediaPhoto(
|
|
media=telegram.InputFile(jpeg, filename=f"photo{i}.jpg", attach=True),
|
|
filename=f"photo{i}.jpg",
|
|
)
|
|
for i, jpeg in enumerate(jpegs)
|
|
]
|
|
|
|
await self.bot.send_media_group(
|
|
chat_id=chat_id,
|
|
media=media,
|
|
caption=str(datetime.now().strftime("%A, %B %d, %Y %H:%M:%S")),
|
|
)
|
|
|
|
async def parse_message(self, msg: Message):
|
|
match msg.text:
|
|
case "/start":
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text="Hello, I'm a bot that can send you a photo from the camera.",
|
|
)
|
|
case "/reposition":
|
|
privacy_mode = self.tapo.getPrivacyMode()
|
|
self.tapo.setPrivacyMode(False)
|
|
presets = self._get_presets()
|
|
if self.profile_name not in presets:
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Profile '{self.profile_name}' not found",
|
|
)
|
|
return
|
|
self.tapo.setPreset(presets[self.profile_name])
|
|
message = await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Repositioning to profile '{self.profile_name}'...",
|
|
)
|
|
await asyncio.sleep(5)
|
|
self.tapo.setPrivacyMode(privacy_mode)
|
|
await self.bot.edit_message_text(
|
|
chat_id=msg.chat_id,
|
|
message_id=message.message_id,
|
|
text=f"Repositioning complete",
|
|
)
|
|
case "/calibrate":
|
|
privacy_mode = self.tapo.getPrivacyMode()
|
|
self.tapo.setPrivacyMode(False)
|
|
await asyncio.sleep(0.3)
|
|
self.tapo.calibrateMotor()
|
|
message = await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Calibrating, this will take ~25s...",
|
|
)
|
|
await asyncio.sleep(26)
|
|
await self.bot.edit_message_text(
|
|
chat_id=msg.chat_id,
|
|
message_id=message.message_id,
|
|
text=f"Calibration complete",
|
|
)
|
|
self.tapo.setPrivacyMode(privacy_mode)
|
|
case "/light_on":
|
|
await self._send_item_command("ON")
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Light turned on",
|
|
)
|
|
case "/light_off":
|
|
await self._send_item_command("OFF")
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Light turned off",
|
|
)
|
|
case "/light_status":
|
|
state = await self._get_item_state()
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Light is {state}",
|
|
)
|
|
case "/photo":
|
|
await self._photo_command(msg.chat_id, adjust_perspective=True)
|
|
case "/photo_unprocessed":
|
|
await self._photo_command(msg.chat_id, adjust_perspective=False)
|
|
case "/privacy_on":
|
|
self.tapo.setPrivacyMode(True)
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Privacy mode turned on",
|
|
)
|
|
case "/privacy_off":
|
|
self.tapo.setPrivacyMode(False)
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Privacy mode turned off",
|
|
)
|
|
case "/privacy_status":
|
|
state = self.tapo.getPrivacyMode()
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Privacy mode is {state and 'enabled' or 'disabled'}",
|
|
)
|
|
case "/night_mode_auto":
|
|
self.tapo.setDayNightMode("auto")
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Night mode set to auto",
|
|
)
|
|
case "/night_mode_on":
|
|
self.tapo.setDayNightMode("on")
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Night mode enabled",
|
|
)
|
|
case "/night_mode_off":
|
|
self.tapo.setDayNightMode("off")
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Night mode disabled",
|
|
)
|
|
case "/night_mode_status":
|
|
state = self.tapo.getDayNightMode()
|
|
await self.bot.send_message(
|
|
chat_id=msg.chat_id,
|
|
text=f"Night mode is {state}",
|
|
)
|
|
|
|
async def take_photo(
|
|
self, adjust_perspective=True, timeout=10.0, chat_action_fn=None
|
|
) -> List[cv2.typing.MatLike]:
|
|
item_state = await self._get_item_state()
|
|
if item_state == "OFF":
|
|
print("Turning light on")
|
|
await self._send_item_command("ON")
|
|
|
|
try:
|
|
return await asyncio.get_event_loop().run_in_executor(
|
|
self.executor,
|
|
self._take_photo_blocking,
|
|
adjust_perspective,
|
|
timeout,
|
|
chat_action_fn,
|
|
)
|
|
finally:
|
|
if item_state == "OFF":
|
|
print("Turning light back off")
|
|
await self._send_item_command("OFF")
|
|
|
|
async def run(self):
|
|
async with self.bot:
|
|
self.me = await self.bot.get_me()
|
|
|
|
updater = Updater(bot=self.bot, update_queue=asyncio.Queue())
|
|
async with updater:
|
|
queue = await updater.start_polling(allowed_updates=[Update.MESSAGE])
|
|
print("Bot is up and running")
|
|
|
|
while True:
|
|
# noinspection PyBroadException
|
|
try:
|
|
upd = cast(Update, await queue.get())
|
|
print(upd)
|
|
if not upd.message or upd.message.chat_id not in self.chat_ids:
|
|
print("Ignoring message")
|
|
continue
|
|
# noinspection PyBroadException
|
|
try:
|
|
await self.parse_message(upd.message)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
exc = traceback.format_exc()
|
|
await self.bot.send_message(
|
|
chat_id=upd.message.chat_id,
|
|
text=f"Error: {exc}",
|
|
)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
bot = Bot()
|
|
asyncio.run(bot.run())
|