stats-display/stats-sender/stats_sender.py

220 lines
5.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import struct
import sys
import threading
import time
import traceback
import psutil
import sensors
import serial
from openrgb import OpenRGBClient
from openrgb.utils import DeviceType, RGBColor
should_stop = False
debug = True
structure = [
{'pack': 'I', 'mode': 'magic', 'value': 0xAAAAAAAA},
# Temps
{'pack': 'B', 'mode': 'hwmon', 'driver': 'k10temp', 'feature': 'temp1'},
{'pack': 'B', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'temp3'},
{'pack': 'B', 'mode': 'hwmon', 'driver': 'amdgpu', 'feature': 'temp1'},
{'pack': 'B', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'temp5'},
{'pack': 'B', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'temp1'},
{'pack': 'B', 'mode': 'hwmon', 'driver': 'nvme', 'feature': 'temp1'},
# Fans
{'pack': 'H', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'fan2'},
{'pack': 'H', 'mode': 'hwmon', 'driver': 'amdgpu', 'feature': 'fan1'},
{'pack': 'H', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'fan7'},
{'pack': 'H', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'fan5'},
{'pack': 'H', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'fan4'},
{'pack': 'H', 'mode': 'hwmon', 'driver': 'nct6797', 'feature': 'fan3'},
# CPU/RAM
{'pack': 'H', 'mode': 'cpu_freq'},
{'pack': 'I', 'mode': 'cpu_load_avg'},
{'pack': 'I', 'mode': 'ram_used'},
{'pack': 'H', 'mode': 'cpu_perc'},
{'pack': 'H', 'mode': 'cpu_perc_max'},
{'pack': 'H', 'mode': 'cpu_perc_kernel'},
{'pack': 'B', 'mode': 'ram_perc'},
{'pack': 'B', 'mode': 'ram_perc_buffers'},
# Padding
{'pack': 'B', 'mode': 'magic', 'value': 0},
{'pack': 'H', 'mode': 'magic', 'value': 0},
]
def cpu_n_freq(cpu):
with open(f"/sys/devices/system/cpu/cpu{cpu}/cpufreq/cpuinfo_cur_freq") as f:
return int(f.read().strip()) / 1000
def filter_int(x):
try:
int(x)
return True
except ValueError:
return False
def cpu_freq():
return \
int(
max(
map(
lambda x: cpu_n_freq(int(x)),
filter(
filter_int,
map(
lambda x: x.replace("cpu", ""),
filter(
lambda x: x.startswith("cpu"),
os.listdir("/sys/devices/system/cpu")
)
)
)
)
)
)
def cpu_perc():
return int(sum(psutil.cpu_percent(percpu=True)) / psutil.cpu_count())
def cpu_perc_max():
return 100
def cpu_perc_kernel():
return int(psutil.cpu_times_percent().system / 100 * cpu_perc_max())
def cpu_load_avg():
return int(psutil.getloadavg()[0] * 100)
def ram_perc():
m = psutil.virtual_memory()
return int((m.total - m.available) / m.total * 100)
def ram_perc_buffers():
m = psutil.virtual_memory()
return int(m.inactive / m.total * 100)
def ram_used():
m = psutil.virtual_memory()
return int((m.total - m.available) / (1024 ** 2))
def loop(serial: serial.Serial, hwmon: dict):
while True:
struct_data = []
struct_fmt = "<"
for item in structure:
struct_fmt += item['pack']
if item['mode'] == 'magic':
struct_data.append(item['value'])
elif item['mode'] == 'hwmon':
# noinspection PyBroadException
try:
struct_data.append(int(hwmon[item['driver']][item['feature']].get_value()))
except Exception:
struct_data.append(0)
else:
struct_data.append(globals()[item['mode']]())
dto = struct.pack(struct_fmt, *struct_data)
checkxor = 0
for byte in dto:
checkxor ^= byte
dto += struct.pack('<BI', checkxor, 0xCCCCCCCC)
serial.write(dto + b'\n')
if debug:
print(f'send[{len(dto) + 1:>3}]: ', dto.hex(' '), "\\n")
time.sleep(2)
def read_serial(s: serial.Serial):
while not should_stop:
line = s.readline(1024)
if line.endswith(b'\n'):
line = line[:-1]
if len(line) == 0:
continue
if debug:
print(f"recv[{len(line):>3}]: ", line.decode(errors='replace'), )
# Set fridge light on if the lid is open
if line.strip().startswith(b"LID "):
lid_open = None
if b"LID OPEN" in line:
lid_open = True
if b"LID CLOSED" in line:
lid_open = False
if lid_open is not None:
set_fridge_lights(lid_open)
def set_fridge_lights(on: bool):
client = None
# noinspection PyBroadException
try:
client = OpenRGBClient()
motherboard = client.get_devices_by_type(DeviceType.MOTHERBOARD)[0]
onboard_led_0 = filter(lambda x: x.name == "Onboard LED 0", motherboard.zones).__next__()
onboard_led_0.set_color(RGBColor(0x00, 0xFF if on else 0x00, 0x00))
except Exception:
traceback.print_exc()
finally:
if client is not None:
client.disconnect()
def main():
global should_stop
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} [tty]")
exit(1)
sensors.init()
hwmon = {}
for chip in sensors.iter_detected_chips():
features = {}
for f in chip:
features[f.name] = f
hwmon[chip.prefix.decode()] = features
s = serial.Serial(port=sys.argv[1], baudrate=115200, timeout=0.5)
t = threading.Thread(target=read_serial, args=(s,))
t.start()
try:
loop(s, hwmon)
finally:
should_stop = True
if __name__ == '__main__':
main()