174 lines
6.4 KiB
Python
174 lines
6.4 KiB
Python
|
import asyncio
|
||
|
import os
|
||
|
import traceback
|
||
|
import warnings
|
||
|
from typing import cast, Optional
|
||
|
|
||
|
import aiohttp
|
||
|
import cv2
|
||
|
import telegram
|
||
|
from aiohttp import BasicAuth
|
||
|
from pytapo import Tapo
|
||
|
from telegram import Update, Message
|
||
|
from telegram.ext import Updater
|
||
|
from urllib3.exceptions import InsecureRequestWarning
|
||
|
|
||
|
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
|
||
|
|
||
|
|
||
|
class Bot:
|
||
|
def __init__(self):
|
||
|
self.token = os.environ["BOT_TOKEN"]
|
||
|
self.camera_ip = os.environ["CAMERA_IP"]
|
||
|
self.camera_user = os.environ["CAMERA_USER"]
|
||
|
self.camera_password = os.environ["CAMERA_PASSWORD"]
|
||
|
self.chat_id = int(os.environ["CHAT_ID"])
|
||
|
self.profile_name = os.environ.get("CAMERA_PROFILE_NAME", "board")
|
||
|
self.openhab_url = os.environ["OPENHAB_URL"]
|
||
|
self.openhab_token = os.environ["OPENHAB_TOKEN"]
|
||
|
self.openhab_item = os.environ["OPENHAB_ITEM"]
|
||
|
|
||
|
self.bot = telegram.Bot(token=self.token)
|
||
|
self.me = None
|
||
|
|
||
|
self.tapo = Tapo(self.camera_ip, self.camera_user, self.camera_password)
|
||
|
|
||
|
def _get_presets(self):
|
||
|
presets = self.tapo.getPresets()
|
||
|
return {v: k for k, v in presets.items()}
|
||
|
|
||
|
async def _get_item_state(self):
|
||
|
url = f"{self.openhab_url}/rest/items/{self.openhab_item}/state"
|
||
|
# use aiohttp instead of requests to avoid blocking
|
||
|
async with aiohttp.ClientSession() as session:
|
||
|
async with session.get(url, auth=BasicAuth(self.openhab_token, "")) as resp:
|
||
|
return await resp.text()
|
||
|
|
||
|
async def _send_item_command(self, command):
|
||
|
url = f"{self.openhab_url}/rest/items/{self.openhab_item}"
|
||
|
async with aiohttp.ClientSession() as session:
|
||
|
async with session.post(
|
||
|
url,
|
||
|
auth=BasicAuth(self.openhab_token, ""),
|
||
|
data=command,
|
||
|
headers={"Content-Type": "text/plain", "Accept": "*/*"},
|
||
|
) as resp:
|
||
|
return await resp.text()
|
||
|
|
||
|
def _take_photo(self) -> Optional[bytes]:
|
||
|
vcap = cv2.VideoCapture(
|
||
|
f"rtsp://{self.camera_user}:{self.camera_password}@{self.camera_ip}:554/stream1"
|
||
|
)
|
||
|
ret, frame = vcap.read()
|
||
|
if not ret:
|
||
|
return None
|
||
|
return frame
|
||
|
|
||
|
async def parse_message(self, msg: Message):
|
||
|
match msg.text:
|
||
|
case "/start":
|
||
|
await self.bot.send_message(
|
||
|
chat_id=msg.chat_id,
|
||
|
text="Hello, I'm a bot that can send you a photo from the camera.",
|
||
|
)
|
||
|
case "/reposition":
|
||
|
presets = self._get_presets()
|
||
|
if self.profile_name not in presets:
|
||
|
await self.bot.send_message(
|
||
|
chat_id=msg.chat_id,
|
||
|
text=f"Profile '{self.profile_name}' not found",
|
||
|
)
|
||
|
return
|
||
|
self.tapo.setPreset(presets[self.profile_name])
|
||
|
await self.bot.send_message(
|
||
|
chat_id=msg.chat_id,
|
||
|
text=f"Repositioned to profile '{self.profile_name}'",
|
||
|
)
|
||
|
case "/calibrate":
|
||
|
self.tapo.calibrateMotor()
|
||
|
presets = self._get_presets()
|
||
|
if self.profile_name in presets:
|
||
|
self.tapo.setPreset(presets[self.profile_name])
|
||
|
await self.bot.send_message(
|
||
|
chat_id=msg.chat_id,
|
||
|
text=f"Calibrated and repositioned to profile '{self.profile_name}'",
|
||
|
)
|
||
|
case "/light_on":
|
||
|
await self._send_item_command("ON")
|
||
|
await self.bot.send_message(
|
||
|
chat_id=msg.chat_id,
|
||
|
text=f"Light turned on",
|
||
|
)
|
||
|
case "/light_off":
|
||
|
await self._send_item_command("OFF")
|
||
|
await self.bot.send_message(
|
||
|
chat_id=msg.chat_id,
|
||
|
text=f"Light turned off",
|
||
|
)
|
||
|
case "/light_status":
|
||
|
state = await self._get_item_state()
|
||
|
await self.bot.send_message(
|
||
|
chat_id=msg.chat_id,
|
||
|
text=f"Light is {state}",
|
||
|
)
|
||
|
case "/photo":
|
||
|
await self.bot.send_chat_action(
|
||
|
chat_id=msg.chat_id, action="upload_photo"
|
||
|
)
|
||
|
|
||
|
light_state = await self._get_item_state()
|
||
|
if light_state == "OFF":
|
||
|
print("Turning light on")
|
||
|
await self._send_item_command("ON")
|
||
|
|
||
|
print("Disabling privacy mode")
|
||
|
self.tapo.setPrivacyMode(False)
|
||
|
await asyncio.sleep(2)
|
||
|
print("Taking photo")
|
||
|
photo = self._take_photo()
|
||
|
print("Enabling privacy mode")
|
||
|
self.tapo.setPrivacyMode(True)
|
||
|
|
||
|
if light_state == "OFF":
|
||
|
print("Turning light back off")
|
||
|
await self._send_item_command("OFF")
|
||
|
|
||
|
if photo is None:
|
||
|
await self.bot.send_message(
|
||
|
chat_id=msg.chat_id,
|
||
|
text=f"Error taking photo",
|
||
|
)
|
||
|
return
|
||
|
# Encode photo to jpeg
|
||
|
photo = cv2.imencode(".jpg", photo)[1].tobytes()
|
||
|
|
||
|
await self.bot.send_photo(
|
||
|
chat_id=msg.chat_id,
|
||
|
photo=telegram.InputFile(photo, filename="photo.jpg"),
|
||
|
)
|
||
|
|
||
|
async def run(self):
|
||
|
async with self.bot:
|
||
|
self.me = await self.bot.get_me()
|
||
|
|
||
|
updater = Updater(bot=self.bot, update_queue=asyncio.Queue())
|
||
|
async with updater:
|
||
|
queue = await updater.start_polling(allowed_updates=[Update.MESSAGE])
|
||
|
|
||
|
while True:
|
||
|
# noinspection PyBroadException
|
||
|
try:
|
||
|
upd = cast(Update, await queue.get())
|
||
|
print(upd)
|
||
|
if not upd.message or upd.message.chat_id != self.chat_id:
|
||
|
print("Ignoring message")
|
||
|
continue
|
||
|
await self.parse_message(upd.message)
|
||
|
except Exception:
|
||
|
traceback.print_exc()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
bot = Bot()
|
||
|
asyncio.run(bot.run())
|