Запрос:
~G
Ответ:
~G25.0
где 25.0 - температура в градусах Цельсия Запрос:
~G
Ответ:
~G26.6;13.5
где 26.6 - температура в градусах Цельсия, 13.5 - измеренная влажность в процентах Запрос:
~W1000
Ответ:
~F1000
~G24.0
~G24.0
... echo "~W1000" > /dev/ttyACM0 cat USBPORT | sed 's/~G//' | socat - udp-sendto:127.0.0.1:5000 cat /dev/ttyACM1 | sed 's/~G//' | { read temp; if [[ -z "$temp" ]]; then echo 'No data'; elif [[ $(echo "$temp>30" | bc -l) -ne 0 ]]; then echo 'Overtemp'; else echo 'Normal'; fi; } $port = new-object System.IO.Ports.SerialPort COM8, 9600, None, 8, One; $port.Open(); $data = $port.ReadLine(); $port.Close(); if (-not $data) { "no data" } else { $temp = $data -replace '~G', ''; if ([float]$temp -gt 30) { "Overtemp" } else { "Normal" } } Запрос:
~W0
Ответ:
~F0
socat -d -d TCP4-LISTEN:6000,fork /dev/ttyACM0,raw,echo=0 import serial
import time
import argparse
import datetime
def parse_g_response(data: str):
# Ожидаемые форматы:
# ~GE
# ~G<temp>
# ~G<temp>;<hum>
if not data.startswith("~G"):
return ("unexpected", data)
payload = data[2:]
if payload == "E":
return ("error", None)
try:
if ";" in payload:
temp_str, hum_str = payload.split(";", 1)
temp = float(temp_str)
hum = float(hum_str)
return ("temp_hum", (temp, hum))
else:
temp = float(payload)
return ("temp_only", temp)
except ValueError:
return ("unexpected", data)
def main():
parser = argparse.ArgumentParser(description="Опрос ODTEMP командой ~G")
parser.add_argument(
"--port",
required=True,
help="Последовательный порт (например, COM3 или /dev/ttyUSB0)",
)
parser.add_argument(
"--interval",
type=float,
default=1.0,
help="Интервал опроса в секундах (по умолчанию: 1.0)",
)
parser.add_argument(
"--read-timeout",
type=float,
default=1.0,
help="Таймаут чтения из порта в секундах (по умолчанию: 1.0)",
)
args = parser.parse_args()
ser = serial.Serial(args.port, 115200, timeout=args.read_timeout)
try:
print(f"Подключено к {args.port}")
print(f"Опрос каждые {args.interval} сек")
while True:
current_time = datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S")
ser.write(b"~G")
data = ser.readline().decode("utf-8", errors="replace").strip()
kind, value = parse_g_response(data)
if kind == "error":
print(f"[{current_time}] Ошибка получения данных с сенсора")
elif kind == "temp_hum":
temp, hum = value
print(f"[{current_time}] Температура: {temp:.2f} °C, Влажность: {hum:.2f} %")
elif kind == "temp_only":
print(f"[{current_time}] Температура: {value:.2f} °C")
else:
print(f"[{current_time}] Неожиданный ответ: {value}")
time.sleep(args.interval)
except KeyboardInterrupt:
print("\nОпрос остановлен пользователем")
except Exception as e:
print(f"Ошибка: {e}")
finally:
ser.close()
print("Соединение закрыто")
if __name__ == "__main__":
main()
Пример небольшой программы для сохранения данных в лог, который работает с интерфейсом HID вместо CDC.
Для работы нужен python3 и библиотека hidapi.
• #!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import datetime
import hid
import re
import sys
import time
# Константы устройства
OD_VID = 0x0483
OD_IOT_PID = 0xA26A
# HID report IDs
HID_DATA_REPORT_ID = 1
HID_EVENT_REPORT_ID = 2
HID_FW_REPORT_ID = 3
HID_CMD_REPORT_ID = 4
SENSOR_STATES = {
0: "NORMAL",
1: "ACCEPTABLE",
2: "CRITICAL",
3: "INTERROR",
4: "CUSTOM",
}
def now_str() -> str:
return datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S")
def available_devices():
"""
Возвращает список найденных OD HID-устройств.
"""
devices = hid.enumerate(OD_VID, OD_IOT_PID)
found_devices = []
iot_product = re.compile(r"IOT\s+(.+?)(?:\s+HID)?$", re.IGNORECASE)
for idx, d in enumerate(devices):
vendor = d.get("manufacturer_string") or ""
product = d.get("product_string") or ""
serial_raw = d.get("serial_number") or ""
serial_show = serial_raw or "S/N"
path = d.get("path")
print(f"[{idx}] Найдено устройство: {vendor} / {product} @ {serial_show}")
match = iot_product.match(product.strip()) if product else None
if vendor == "Open Development" and match and path:
found_devices.append(
{
"index": idx,
"device_type": match.group(1).strip(),
"serial": serial_raw,
"serial_show": serial_show,
"path": path,
"vendor": vendor,
"product": product,
}
)
else:
print(" -> Устройство не соответствует критериям OD HID")
return found_devices
def choose_device(devices, serial_number=None, index=None):
if not devices:
return None
if serial_number:
for d in devices:
if d["serial"] == serial_number or d["serial_show"] == serial_number:
return d
print(f"Устройство с серийным номером {serial_number} не найдено, используем первое
доступное")
if index is not None:
if 0 <= index < len(devices):
return devices[index]
print(f"Индекс {index} вне диапазона 0..{len(devices)-1}, используем первое доступное")
return devices[0]
def open_device(device_info):
"""
Открывает HID-устройство по path.
"""
dev = hid.device()
dev.open_path(device_info["path"])
return dev
def detect_humidity_capability(dev):
"""
Пытается определить поддержку humidity через Feature Report (ID=4).
Для temp+hum (например DHT/HDC) в payload есть поля влажности,
и байты [17:23] обычно не нулевые.
"""
try:
fr = bytes(dev.get_feature_report(HID_CMD_REPORT_ID, 64))
if not fr:
return None
if fr[0] != HID_CMD_REPORT_ID:
return None
if len(fr) >= 23:
if any(fr[17:23]):
return True
return None
if len(fr) <= 16:
return False
return None
except Exception:
return None
def parse_data_report(payload, has_humidity_capability=None):
"""
DATA report payload после report_id.
Протокол:
- temp-only: [temp_lo temp_hi]
- temp+hum : [temp_lo temp_hi hum_lo hum_hi]
Значения в 0.01 единицах (int16 little-endian).
"""
if len(payload) < 2:
return None
temp_raw = int.from_bytes(payload[0:2], byteorder="little", signed=True)
temp = temp_raw / 100.0
hum = None
if len(payload) >= 4:
hum_raw = int.from_bytes(payload[2:4], byteorder="little", signed=True)
if has_humidity_capability is True:
if 0 <= hum_raw <= 10000:
hum = hum_raw / 100.0
elif has_humidity_capability is None:
if 0 <= hum_raw <= 10000 and any(payload[2:4]):
hum = hum_raw / 100.0
return {"temperature": temp, "humidity": hum}
def process_custom_command(cmd, payload):
print(f"[{now_str()}] Получена нестандартная команда: cmd=0x{cmd:02X},
payload={payload.hex()}")
def sensor_state_changed(state):
state_name = SENSOR_STATES.get(state, f"UNKNOWN({state})")
print(f"[{now_str()}] Изменение состояния сенсора: {state_name}")
def decode_fw_report(payload):
if not payload:
return ""
length = payload[0]
raw = payload[1: 1 + length]
try:
return raw.decode("latin1", errors="replace")
except Exception:
return raw.hex()
def main():
parser = argparse.ArgumentParser(description="Опрос OD HID датчиков")
parser.add_argument("--serial", help="Серийный номер устройства", default=None)
parser.add_argument("--index", type=int, default=None, help="Индекс устройства из списка
enumerate")
parser.add_argument("--read-timeout-ms", type=int, default=100, help="Таймаут чтения HID в
мс")
parser.add_argument("--device-timeout", type=float, default=10.0, help="Таймаут молчания
устройства, сек")
args = parser.parse_args()
devices = available_devices()
if not devices:
print("Устройства не найдены")
sys.exit(0)
device_info = choose_device(devices, serial_number=args.serial, index=args.index)
if not device_info:
print("Подходящее устройство не найдено")
sys.exit(1)
print(f"\nОткрываем устройство: {device_info['device_type']}
({device_info['serial_show']})")
dev = open_device(device_info)
try:
has_humidity_capability = detect_humidity_capability(dev)
last_data_time = time.time()
while True:
try:
report = dev.read(64, args.read_timeout_ms)
if report:
report = bytes(report)
# Некоторые стеки могут вернуть ведущий 0 перед report_id
if len(report) > 1 and report[0] == 0 and report[1] in (
HID_DATA_REPORT_ID,
HID_EVENT_REPORT_ID,
HID_FW_REPORT_ID,
HID_CMD_REPORT_ID,
):
report = report[1:]
report_id = report[0]
payload = report[1:]
last_data_time = time.time()
if report_id == HID_DATA_REPORT_ID:
value = parse_data_report(payload,
has_humidity_capability=has_humidity_capability)
if value is None:
print(f"[{now_str()}] Некорректный DATA report: {report.hex()}")
continue
temp = value["temperature"]
hum = value["humidity"]
# Дополнительное автоопределение по фактическому payload
if has_humidity_capability is None and hum is None and len(payload) >=
4:
hum_raw = int.from_bytes(payload[2:4], byteorder="little",
signed=True)
if 0 <= hum_raw <= 10000 and any(payload[2:4]):
hum = hum_raw / 100.0
has_humidity_capability = True
print(f"[{now_str()}] Автоопределение: обнаружен канал
влажности")
if hum is None:
print(f"[{now_str()}] Температура: {temp:.2f} °C")
else:
print(f"[{now_str()}] Температура: {temp:.2f} °C, Влажность:
{hum:.2f} %")
elif report_id == HID_EVENT_REPORT_ID:
if payload:
sensor_state_changed(payload[0])
else:
print(f"[{now_str()}] Пустой EVENT report")
elif report_id == HID_FW_REPORT_ID:
fw = decode_fw_report(payload)
print(f"[{now_str()}] [FW] Версия прошивки: {fw}")
elif report_id == HID_CMD_REPORT_ID:
if payload:
process_custom_command(payload[0], payload[1:])
else:
print(f"[{now_str()}] Получен пустой CMD report")
else:
print(f"[{now_str()}] Неизвестный report id: {report_id}
({report.hex()})")
if (time.time() - last_data_time) > args.device_timeout:
print(f"[{now_str()}] Устройство не отвечает более {args.device_timeout:.1f}
секунд")
break
except IOError as e:
print(f"[{now_str()}] Ошибка чтения: {e}")
time.sleep(0.5)
except KeyboardInterrupt:
print("\nПрерывание пользователем")
finally:
dev.close()
print("Устройство закрыто")
if __name__ == "__main__":
main()