import asyncio import os import time import traceback import warnings from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import cast, List import aiohttp import cv2 import numpy as np import telegram from aiohttp import BasicAuth from cv2 import aruco from pytapo import Tapo from telegram import Update, Message from telegram.ext import Updater from urllib3.exceptions import InsecureRequestWarning warnings.filterwarnings("ignore", category=InsecureRequestWarning) class Bot: def __init__(self): self.token = os.environ["BOT_TOKEN"] self.camera_ip = os.environ["CAMERA_IP"] self.camera_user = os.environ["CAMERA_USER"] self.camera_password = os.environ["CAMERA_PASSWORD"] self.chat_ids = list(map(int, os.environ["CHAT_ID"].split(","))) self.profile_name = os.environ.get("CAMERA_PROFILE_NAME", "board") self.openhab_url = os.environ["OPENHAB_URL"] self.openhab_token = os.environ["OPENHAB_TOKEN"] self.openhab_item = os.environ["OPENHAB_ITEM"] = telegram.Bot(token=self.token) = None self.tapo = Tapo(self.camera_ip, self.camera_user, self.camera_password) self.executor = ThreadPoolExecutor(max_workers=len(os.sched_getaffinity(0))) self.last_aruco_corners = [None] * 4 def _get_presets(self): presets = self.tapo.getPresets() return {v: k for k, v in presets.items()} async def _get_item_state(self): url = f"{self.openhab_url}/rest/items/{self.openhab_item}/state" # use aiohttp instead of requests to avoid blocking async with aiohttp.ClientSession() as session: async with session.get(url, auth=BasicAuth(self.openhab_token, "")) as resp: return await resp.text() async def _send_item_command(self, command): print(f"Sending command {command}") url = f"{self.openhab_url}/rest/items/{self.openhab_item}" async with aiohttp.ClientSession() as session: async with url, auth=BasicAuth(self.openhab_token, ""), data=command, headers={"Content-Type": "text/plain", "Accept": "*/*"}, ) as resp: return await resp.text() def _take_photo_blocking( self, adjust_perspective=True, timeout: float = 10.0, chat_action_fn=None, ) -> List[cv2.typing.MatLike]: privacy_mode = self.tapo.getPrivacyMode() # Prepare the camera print("Disabling privacy mode and setting auto day/night mode...") self.tapo.setPrivacyMode(False) self.tapo.setDayNightMode("auto") time.sleep(1) # Take the color image vcap = cv2.VideoCapture( f"rtsp://{self.camera_user}:{self.camera_password}@{self.camera_ip}:554/stream1" ) if chat_action_fn: chat_action_fn() print("Taking color image...") ret, pretty_image = while pretty_image is None: ret, pretty_image = if not adjust_perspective: self.tapo.setPrivacyMode(privacy_mode) return [pretty_image] self.tapo.setDayNightMode("on") # Iterate until we find all 4 aruco markers or timeout aruco_corners = [None] * 4 def found_all(corners): return all(c is not None for c in corners) annotated_image = None aruco_dict = aruco.getPredefinedDictionary(aruco.DICT_4X4_50) aruco_params = aruco.DetectorParameters() if chat_action_fn: chat_action_fn() t0 = time.time() last_chat_action = 0 print("Taking image with ArUco markers...") while not found_all(aruco_corners): delta = time.time() - t0 if delta > timeout: a = self.last_aruco_corners.copy() for i in range(4): if aruco_corners[i] is not None: a[i] = aruco_corners[i] aruco_corners = a if found_all(aruco_corners): print("Timeout waiting for ArUco markers, using cached corners") break print( "Timeout waiting for ArUco markers, returning only original image" ) self.tapo.setPrivacyMode(privacy_mode) self.tapo.setDayNightMode("auto") return [pretty_image] if delta - last_chat_action > 3: last_chat_action = delta if chat_action_fn: chat_action_fn() ret, annotated_image = if not ret: continue # Detect the markers corners, ids, rejected = aruco.detectMarkers( annotated_image, aruco_dict, parameters=aruco_params ) if corners is not None and ids is not None: for corner, i in zip(corners, ids): index = i[0] if not (0 <= index < 4): warnings.warn(f"Invalid ArUco marker ID: {index}") continue aruco_corners[index] = corner assert annotated_image is not None del vcap print( "Found all ArUco markers, enabled privacy mode and set auto day/night mode..." ) self.tapo.setDayNightMode("auto") self.tapo.setPrivacyMode(privacy_mode) aruco.drawDetectedMarkers(annotated_image, aruco_corners, np.array(range(4))) # Annotate the image with the detected markers and apply the perspective transform to the pretty image # Get the outermost points of each marker tl_marker = aruco_corners[0].squeeze()[0] # top left marker tr_marker = aruco_corners[1].squeeze()[1] # top right marker bl_marker = aruco_corners[2].squeeze()[3] # bottom left marker bc_marker = aruco_corners[3].squeeze()[2] # bottom center marker # Calculate the fourth point by computing the line through the bottom markers and intersecting with the vertical # line through the top right marker slope = (bc_marker[1] - bl_marker[1]) / (bc_marker[0] - bl_marker[0]) y_intersection = slope * (tr_marker[0] - bc_marker[0]) + bc_marker[1] fourth_point = [tr_marker[0], y_intersection] rectangle_points = np.array( [tl_marker, bl_marker, fourth_point, tr_marker], dtype="float32" ) # Expand the rectangle slightly centroid = np.mean(rectangle_points, axis=0) expanded_rectangle_points = ( rectangle_points + (rectangle_points - centroid) * 0.025 ) # Draw the expanded rectangle on the annotated image cv2.polylines( annotated_image, [np.int32(expanded_rectangle_points)], isClosed=True, color=(0, 255, 0), thickness=3, ) # Define destination points for perspective transform, maintaining a 3:2 aspect ratio width, height = 300 * 5, 200 * 5 dst_pts = np.array( [[0, 0], [0, height], [width, height], [width, 0]], dtype="float32" ) matrix = cv2.getPerspectiveTransform(expanded_rectangle_points, dst_pts) warped = cv2.warpPerspective(pretty_image, matrix, (width, height)) self.last_aruco_corners = aruco_corners return [warped, annotated_image] async def _photo_command(self, chat_id: int, adjust_perspective: bool = True): async def chat_action_fn(): await, action="upload_photo") await chat_action_fn() loop = asyncio.get_event_loop() photos = await self.take_photo( adjust_perspective=adjust_perspective, chat_action_fn=lambda: loop.create_task(chat_action_fn()), ) jpegs = [cv2.imencode(".jpg", photo)[1].tobytes() for photo in photos] media = [ telegram.InputMediaPhoto( media=telegram.InputFile(jpeg, filename=f"photo{i}.jpg", attach=True), filename=f"photo{i}.jpg", ) for i, jpeg in enumerate(jpegs) ] await chat_id=chat_id, media=media, caption=str("%A, %B %d, %Y %H:%M:%S")), ) async def parse_message(self, msg: Message): match msg.text: case "/start": await chat_id=msg.chat_id, text="Hello, I'm a bot that can send you a photo from the camera.", ) case "/reposition": privacy_mode = self.tapo.getPrivacyMode() self.tapo.setPrivacyMode(False) presets = self._get_presets() if self.profile_name not in presets: await chat_id=msg.chat_id, text=f"Profile '{self.profile_name}' not found", ) return self.tapo.setPreset(presets[self.profile_name]) message = await chat_id=msg.chat_id, text=f"Repositioning to profile '{self.profile_name}'...", ) await asyncio.sleep(5) self.tapo.setPrivacyMode(privacy_mode) await chat_id=msg.chat_id, message_id=message.message_id, text=f"Repositioning complete", ) case "/calibrate": privacy_mode = self.tapo.getPrivacyMode() self.tapo.setPrivacyMode(False) await asyncio.sleep(0.3) self.tapo.calibrateMotor() message = await chat_id=msg.chat_id, text=f"Calibrating, this will take ~25s...", ) await asyncio.sleep(26) await chat_id=msg.chat_id, message_id=message.message_id, text=f"Calibration complete", ) self.tapo.setPrivacyMode(privacy_mode) case "/light_on": await self._send_item_command("ON") await chat_id=msg.chat_id, text=f"Light turned on", ) case "/light_off": await self._send_item_command("OFF") await chat_id=msg.chat_id, text=f"Light turned off", ) case "/light_status": state = await self._get_item_state() await chat_id=msg.chat_id, text=f"Light is {state}", ) case "/photo": await self._photo_command(msg.chat_id, adjust_perspective=True) case "/photo_unprocessed": await self._photo_command(msg.chat_id, adjust_perspective=False) case "/privacy_on": self.tapo.setPrivacyMode(True) await chat_id=msg.chat_id, text=f"Privacy mode turned on", ) case "/privacy_off": self.tapo.setPrivacyMode(False) await chat_id=msg.chat_id, text=f"Privacy mode turned off", ) case "/privacy_status": state = self.tapo.getPrivacyMode() await chat_id=msg.chat_id, text=f"Privacy mode is {state and 'enabled' or 'disabled'}", ) case "/night_mode_auto": self.tapo.setDayNightMode("auto") await chat_id=msg.chat_id, text=f"Night mode set to auto", ) case "/night_mode_on": self.tapo.setDayNightMode("on") await chat_id=msg.chat_id, text=f"Night mode enabled", ) case "/night_mode_off": self.tapo.setDayNightMode("off") await chat_id=msg.chat_id, text=f"Night mode disabled", ) case "/night_mode_status": state = self.tapo.getDayNightMode() await chat_id=msg.chat_id, text=f"Night mode is {state}", ) async def take_photo( self, adjust_perspective=True, timeout=10.0, chat_action_fn=None ) -> List[cv2.typing.MatLike]: item_state = await self._get_item_state() if item_state == "OFF": print("Turning light on") await self._send_item_command("ON") try: return await asyncio.get_event_loop().run_in_executor( self.executor, self._take_photo_blocking, adjust_perspective, timeout, chat_action_fn, ) finally: if item_state == "OFF": print("Turning light back off") await self._send_item_command("OFF") async def run(self): async with = await updater = Updater(, update_queue=asyncio.Queue()) async with updater: queue = await updater.start_polling(allowed_updates=[Update.MESSAGE]) print("Bot is up and running") while True: # noinspection PyBroadException try: upd = cast(Update, await queue.get()) print(upd) if not upd.message or upd.message.chat_id not in self.chat_ids: print("Ignoring message") continue # noinspection PyBroadException try: await self.parse_message(upd.message) except Exception: traceback.print_exc() exc = traceback.format_exc() await chat_id=upd.message.chat_id, text=f"Error: {exc}", ) except Exception: traceback.print_exc() if __name__ == "__main__": bot = Bot()