import asyncio import os import traceback import warnings from typing import cast, Optional import aiohttp import cv2 import telegram from aiohttp import BasicAuth from pytapo import Tapo from telegram import Update, Message from telegram.ext import Updater from urllib3.exceptions import InsecureRequestWarning warnings.filterwarnings("ignore", category=InsecureRequestWarning) class Bot: def __init__(self): self.token = os.environ["BOT_TOKEN"] self.camera_ip = os.environ["CAMERA_IP"] self.camera_user = os.environ["CAMERA_USER"] self.camera_password = os.environ["CAMERA_PASSWORD"] self.chat_id = int(os.environ["CHAT_ID"]) self.profile_name = os.environ.get("CAMERA_PROFILE_NAME", "board") self.openhab_url = os.environ["OPENHAB_URL"] self.openhab_token = os.environ["OPENHAB_TOKEN"] self.openhab_item = os.environ["OPENHAB_ITEM"] self.bot = telegram.Bot(token=self.token) self.me = None self.tapo = Tapo(self.camera_ip, self.camera_user, self.camera_password) def _get_presets(self): presets = self.tapo.getPresets() return {v: k for k, v in presets.items()} async def _get_item_state(self): url = f"{self.openhab_url}/rest/items/{self.openhab_item}/state" # use aiohttp instead of requests to avoid blocking async with aiohttp.ClientSession() as session: async with session.get(url, auth=BasicAuth(self.openhab_token, "")) as resp: return await resp.text() async def _send_item_command(self, command): url = f"{self.openhab_url}/rest/items/{self.openhab_item}" async with aiohttp.ClientSession() as session: async with session.post( url, auth=BasicAuth(self.openhab_token, ""), data=command, headers={"Content-Type": "text/plain", "Accept": "*/*"}, ) as resp: return await resp.text() def _take_photo(self) -> Optional[bytes]: vcap = cv2.VideoCapture( f"rtsp://{self.camera_user}:{self.camera_password}@{self.camera_ip}:554/stream1" ) ret, frame = vcap.read() if not ret: return None return frame async def parse_message(self, msg: Message): match msg.text: case "/start": await self.bot.send_message( chat_id=msg.chat_id, text="Hello, I'm a bot that can send you a photo from the camera.", ) case "/reposition": presets = self._get_presets() if self.profile_name not in presets: await self.bot.send_message( chat_id=msg.chat_id, text=f"Profile '{self.profile_name}' not found", ) return self.tapo.setPreset(presets[self.profile_name]) await self.bot.send_message( chat_id=msg.chat_id, text=f"Repositioned to profile '{self.profile_name}'", ) case "/calibrate": self.tapo.calibrateMotor() presets = self._get_presets() if self.profile_name in presets: self.tapo.setPreset(presets[self.profile_name]) await self.bot.send_message( chat_id=msg.chat_id, text=f"Calibrated and repositioned to profile '{self.profile_name}'", ) case "/light_on": await self._send_item_command("ON") await self.bot.send_message( chat_id=msg.chat_id, text=f"Light turned on", ) case "/light_off": await self._send_item_command("OFF") await self.bot.send_message( chat_id=msg.chat_id, text=f"Light turned off", ) case "/light_status": state = await self._get_item_state() await self.bot.send_message( chat_id=msg.chat_id, text=f"Light is {state}", ) case "/photo": await self.bot.send_chat_action( chat_id=msg.chat_id, action="upload_photo" ) light_state = await self._get_item_state() if light_state == "OFF": print("Turning light on") await self._send_item_command("ON") print("Disabling privacy mode") self.tapo.setPrivacyMode(False) await asyncio.sleep(2) print("Taking photo") photo = self._take_photo() print("Enabling privacy mode") self.tapo.setPrivacyMode(True) if light_state == "OFF": print("Turning light back off") await self._send_item_command("OFF") if photo is None: await self.bot.send_message( chat_id=msg.chat_id, text=f"Error taking photo", ) return # Encode photo to jpeg photo = cv2.imencode(".jpg", photo)[1].tobytes() await self.bot.send_photo( chat_id=msg.chat_id, photo=telegram.InputFile(photo, filename="photo.jpg"), ) async def run(self): async with self.bot: self.me = await self.bot.get_me() updater = Updater(bot=self.bot, update_queue=asyncio.Queue()) async with updater: queue = await updater.start_polling(allowed_updates=[Update.MESSAGE]) while True: # noinspection PyBroadException try: upd = cast(Update, await queue.get()) print(upd) if not upd.message or upd.message.chat_id != self.chat_id: print("Ignoring message") continue await self.parse_message(upd.message) except Exception: traceback.print_exc() if __name__ == "__main__": bot = Bot() asyncio.run(bot.run())