board_camera_bot/bot.py
2023-09-15 00:11:13 +02:00

174 lines
6.4 KiB
Python

import asyncio
import os
import traceback
import warnings
from typing import cast, Optional
import aiohttp
import cv2
import telegram
from aiohttp import BasicAuth
from pytapo import Tapo
from telegram import Update, Message
from telegram.ext import Updater
from urllib3.exceptions import InsecureRequestWarning
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
class Bot:
def __init__(self):
self.token = os.environ["BOT_TOKEN"]
self.camera_ip = os.environ["CAMERA_IP"]
self.camera_user = os.environ["CAMERA_USER"]
self.camera_password = os.environ["CAMERA_PASSWORD"]
self.chat_id = int(os.environ["CHAT_ID"])
self.profile_name = os.environ.get("CAMERA_PROFILE_NAME", "board")
self.openhab_url = os.environ["OPENHAB_URL"]
self.openhab_token = os.environ["OPENHAB_TOKEN"]
self.openhab_item = os.environ["OPENHAB_ITEM"]
self.bot = telegram.Bot(token=self.token)
self.me = None
self.tapo = Tapo(self.camera_ip, self.camera_user, self.camera_password)
def _get_presets(self):
presets = self.tapo.getPresets()
return {v: k for k, v in presets.items()}
async def _get_item_state(self):
url = f"{self.openhab_url}/rest/items/{self.openhab_item}/state"
# use aiohttp instead of requests to avoid blocking
async with aiohttp.ClientSession() as session:
async with session.get(url, auth=BasicAuth(self.openhab_token, "")) as resp:
return await resp.text()
async def _send_item_command(self, command):
url = f"{self.openhab_url}/rest/items/{self.openhab_item}"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
auth=BasicAuth(self.openhab_token, ""),
data=command,
headers={"Content-Type": "text/plain", "Accept": "*/*"},
) as resp:
return await resp.text()
def _take_photo(self) -> Optional[bytes]:
vcap = cv2.VideoCapture(
f"rtsp://{self.camera_user}:{self.camera_password}@{self.camera_ip}:554/stream1"
)
ret, frame = vcap.read()
if not ret:
return None
return frame
async def parse_message(self, msg: Message):
match msg.text:
case "/start":
await self.bot.send_message(
chat_id=msg.chat_id,
text="Hello, I'm a bot that can send you a photo from the camera.",
)
case "/reposition":
presets = self._get_presets()
if self.profile_name not in presets:
await self.bot.send_message(
chat_id=msg.chat_id,
text=f"Profile '{self.profile_name}' not found",
)
return
self.tapo.setPreset(presets[self.profile_name])
await self.bot.send_message(
chat_id=msg.chat_id,
text=f"Repositioned to profile '{self.profile_name}'",
)
case "/calibrate":
self.tapo.calibrateMotor()
presets = self._get_presets()
if self.profile_name in presets:
self.tapo.setPreset(presets[self.profile_name])
await self.bot.send_message(
chat_id=msg.chat_id,
text=f"Calibrated and repositioned to profile '{self.profile_name}'",
)
case "/light_on":
await self._send_item_command("ON")
await self.bot.send_message(
chat_id=msg.chat_id,
text=f"Light turned on",
)
case "/light_off":
await self._send_item_command("OFF")
await self.bot.send_message(
chat_id=msg.chat_id,
text=f"Light turned off",
)
case "/light_status":
state = await self._get_item_state()
await self.bot.send_message(
chat_id=msg.chat_id,
text=f"Light is {state}",
)
case "/photo":
await self.bot.send_chat_action(
chat_id=msg.chat_id, action="upload_photo"
)
light_state = await self._get_item_state()
if light_state == "OFF":
print("Turning light on")
await self._send_item_command("ON")
print("Disabling privacy mode")
self.tapo.setPrivacyMode(False)
await asyncio.sleep(2)
print("Taking photo")
photo = self._take_photo()
print("Enabling privacy mode")
self.tapo.setPrivacyMode(True)
if light_state == "OFF":
print("Turning light back off")
await self._send_item_command("OFF")
if photo is None:
await self.bot.send_message(
chat_id=msg.chat_id,
text=f"Error taking photo",
)
return
# Encode photo to jpeg
photo = cv2.imencode(".jpg", photo)[1].tobytes()
await self.bot.send_photo(
chat_id=msg.chat_id,
photo=telegram.InputFile(photo, filename="photo.jpg"),
)
async def run(self):
async with self.bot:
self.me = await self.bot.get_me()
updater = Updater(bot=self.bot, update_queue=asyncio.Queue())
async with updater:
queue = await updater.start_polling(allowed_updates=[Update.MESSAGE])
while True:
# noinspection PyBroadException
try:
upd = cast(Update, await queue.get())
print(upd)
if not upd.message or upd.message.chat_id != self.chat_id:
print("Ignoring message")
continue
await self.parse_message(upd.message)
except Exception:
traceback.print_exc()
if __name__ == "__main__":
bot = Bot()
asyncio.run(bot.run())