#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import struct import sys import threading import time import traceback import psutil import sensors import serial from openrgb import OpenRGBClient from openrgb.utils import DeviceType, RGBColor should_stop = False debug = True structure = [ {'pack': 'I', 'mode': 'magic', 'value': 0xAAAAAAAA}, # Temps {'pack': 'B', 'mode': 'hwmon', 'driver': 'k10temp', 'feature': 'temp1'}, {'pack': 'B', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'temp3'}, {'pack': 'B', 'mode': 'hwmon', 'driver': 'amdgpu', 'feature': 'temp1'}, {'pack': 'B', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'temp5'}, {'pack': 'B', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'temp1'}, {'pack': 'B', 'mode': 'hwmon', 'driver': 'nvme', 'feature': 'temp1'}, # Fans {'pack': 'H', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'fan2'}, {'pack': 'H', 'mode': 'hwmon', 'driver': 'amdgpu', 'feature': 'fan1'}, {'pack': 'H', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'fan7'}, {'pack': 'H', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'fan5'}, {'pack': 'H', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'fan4'}, {'pack': 'H', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'fan3'}, # CPU/RAM {'pack': 'H', 'mode': 'cpu_freq'}, {'pack': 'I', 'mode': 'cpu_load_avg'}, {'pack': 'I', 'mode': 'ram_used'}, {'pack': 'H', 'mode': 'cpu_perc'}, {'pack': 'H', 'mode': 'cpu_perc_max'}, {'pack': 'H', 'mode': 'cpu_perc_kernel'}, {'pack': 'B', 'mode': 'ram_perc'}, {'pack': 'B', 'mode': 'ram_perc_buffers'}, # Padding {'pack': 'B', 'mode': 'magic', 'value': 0}, {'pack': 'H', 'mode': 'magic', 'value': 0}, ] def cpu_n_freq(cpu): with open(f"/sys/devices/system/cpu/cpu{cpu}/cpufreq/cpuinfo_cur_freq") as f: return int(f.read().strip()) / 1000 def filter_int(x): try: int(x) return True except ValueError: return False def cpu_freq(): return \ int( max( map( lambda x: cpu_n_freq(int(x)), filter( filter_int, map( lambda x: x.replace("cpu", ""), filter( lambda x: x.startswith("cpu"), os.listdir("/sys/devices/system/cpu") ) ) ) ) ) ) def cpu_perc(): return int(sum(psutil.cpu_percent(percpu=True)) / psutil.cpu_count()) def cpu_perc_max(): return 100 def cpu_perc_kernel(): return int(psutil.cpu_times_percent().system / 100 * cpu_perc_max()) def cpu_load_avg(): return int(psutil.getloadavg()[0] * 100) def ram_perc(): m = psutil.virtual_memory() return int((m.total - m.available) / m.total * 100) def ram_perc_buffers(): m = psutil.virtual_memory() return int(m.inactive / m.total * 100) def ram_used(): m = psutil.virtual_memory() return int((m.total - m.available) / (1024 ** 2)) def loop(serial: serial.Serial, hwmon: dict): while True: struct_data = [] struct_fmt = "<" for item in structure: struct_fmt += item['pack'] if item['mode'] == 'magic': struct_data.append(item['value']) elif item['mode'] == 'hwmon': # noinspection PyBroadException try: struct_data.append(int(hwmon[item['driver']][item['feature']].get_value())) except Exception: struct_data.append(0) else: struct_data.append(globals()[item['mode']]()) dto = struct.pack(struct_fmt, *struct_data) checkxor = 0 for byte in dto: checkxor ^= byte dto += struct.pack('3}]: ', dto.hex(' '), "\\n") time.sleep(2) def read_serial(s: serial.Serial): while not should_stop: line = s.readline(1024) if line.endswith(b'\n'): line = line[:-1] if len(line) == 0: continue if debug: print(f"recv[{len(line):>3}]: ", line.decode(errors='replace'), ) # Set fridge light on if the lid is open if line.strip().startswith(b"LID "): lid_open = None if b"LID OPEN" in line: lid_open = True if b"LID CLOSED" in line: lid_open = False if lid_open is not None: set_fridge_lights(lid_open) def set_fridge_lights(on: bool): client = None # noinspection PyBroadException try: client = OpenRGBClient() motherboard = client.get_devices_by_type(DeviceType.MOTHERBOARD)[0] onboard_led_0 = filter(lambda x: x.name == "Onboard LED 0", motherboard.zones).__next__() onboard_led_0.set_color(RGBColor(0x00, 0xFF if on else 0x00, 0x00)) except Exception: traceback.print_exc() finally: if client is not None: client.disconnect() def main(): global should_stop if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} [tty]") exit(1) sensors.init() hwmon = {} for chip in sensors.iter_detected_chips(): features = {} for f in chip: features[f.name] = f hwmon[chip.prefix.decode()] = features s = serial.Serial(port=sys.argv[1], baudrate=115200, timeout=0.5) t = threading.Thread(target=read_serial, args=(s,)) t.start() try: loop(s, hwmon) finally: should_stop = True if __name__ == '__main__': main()