{
"sensors": [
{
"id": "/DS18B20/20323939554B43120010800A",
"max": 125,
"min": -55,
"serial": "20323939554B43120010800A",
"state": 0,
"type": "DS18B20",
"unit": "℃",
"value": 25.5
}
],
"timestamp": 1528660156476
}
{
"sensors": [
{
"alias": "TEST-2080-H",
"id": "/HDC2080/20313337584D430100290008",
"max": 125,
"min": -40,
"serial": "20313337584D430100290008",
"state": 0,
"type": "HDC2080",
"unit": "℃",
"value": 28.13
},
{
"alias": "TEST-2080-H",
"id": "/HDC2080_RH/20313337584D430100290008",
"max": 100,
"min": 0,
"serial": "20313337584D430100290008",
"state": 2,
"type": "HDC2080_RH",
"unit": "%",
"value": 26.86
}
],
"timestamp": 1539980752561,
"version": "1.0.1"
}
Запрос:
~G
Ответ:
~G25.0
где 25.0 - текущая температуры термодатчика
Запрос:
~W1000
Ответ:
~F1000
~G24.0
~G24.0
...
echo "~W1000" > /dev/ttyACM0
cat USBPORT | sed 's/~G//' | socat - udp-sendto:127.0.0.1:5000
cat /dev/ttyACM1 | sed 's/~G//' | { read temp; if [[ -z "$temp" ]]; then echo 'No data'; elif [[ $(echo "$temp>30" | bc -l) -ne 0 ]]; then echo 'Overtemp'; else echo 'Normal'; fi; }
$port = new-object System.IO.Ports.SerialPort COM8, 9600, None, 8, One; $port.Open(); $data = $port.ReadLine(); $port.Close(); if (-not $data) { "no data" } else { $temp = $data -replace '~G', ''; if ([float]$temp -gt 30) { "Overtemp" } else { "Normal" } }
Запрос:
~W0
Ответ:
~F0
socat -d -d TCP4-LISTEN:6000,fork /dev/ttyACM0,raw,echo=0
import serial
import time
import argparse
import datetime # Добавляем модуль для работы с датой и временем
# Парсинг аргументов командной строки
parser = argparse.ArgumentParser(description='Опрос ODTEMP-1 командой ~G')
parser.add_argument('--port', required=True, help='Последовательный порт (например, COM3 или /dev/ttyUSB0)')
parser.add_argument('--timeout', type=float, default=1.0, help='Время между опросами в секундах (по умолчанию: 1.0)')
args = parser.parse_args()
# Открытие соединения
ser = serial.Serial(args.port, 115200, timeout=1)
try:
print(f"Подключено к {args.port}")
print(f"Опрос каждые {args.timeout} секунд")
while True:
# Получаем текущую дату и время
current_time = datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S")
ser.write(b"~G")
# Чтение ответа
data = ser.readline().decode('utf-8', errors='replace').strip()
if data.startswith('~GE'):
print(f"[{current_time}] Ошибка получения данных с сенсора")
elif data.startswith('~G'):
print(f"[{current_time}] Температура: {float(data[2:])}")
else:
print(f"[{current_time}] Неожиданный ответ: {data}")
# Ожидание перед следующим опросом
time.sleep(args.timeout)
except KeyboardInterrupt:
print("\nОпрос остановлен пользователем")
except Exception as e:
print(f"Ошибка: {e}")
finally:
# Закрытие соединения
ser.close()
print("Соединение закрыто")
Пример небольшой программы для сохранения данных в лог, который работает с интерфейсом HID вместо CDC.
Для работы нужен python3 и библиотека hidapi.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import hid
import time
import sys
import re
# Константы устройства
OD_VID = 0x0483
OD_IOT_PID = 0xA26A
# HID report IDs
HID_DATA_REPORT_ID = 1
HID_EVENT_REPORT_ID = 2
HID_FW_REPORT_ID = 3
HID_CMD_REPORT_ID = 4
HID_CMD_REPORT_SIZE = 7
def available_devices():
"""
Функция возвращает список найденных HID-устройств.
"""
devices = hid.enumerate(OD_VID, OD_IOT_PID)
found_devices = []
iot_product = re.compile(r"IOT ([^\s]+)(?: HID)?")
for d in devices:
vendor = d.get('manufacturer_string', '')
product = d.get('product_string', '')
serial = d.get('serial_number', '')
print(f"Найдено устройство: {vendor} / {product} @ {serial}")
if vendor == "Open Development" and iot_product.match(product):
match = iot_product.match(product)
device_type = match.group(1)
if not serial:
serial = "S/N"
found_devices.append((device_type, serial))
else:
print("Устройство не соответствует критериям OD HID")
return found_devices
def open_device(serial_number):
"""
Открывает устройство с заданным серийным номером.
"""
try:
dev = hid.device()
devices = hid.enumerate(OD_VID, OD_IOT_PID)
if not devices:
print("Устройства не найдены")
sys.exit(0)
device_path = None
for d in devices:
if d.get('serial_number', '') == serial_number:
device_path = d['path']
break
if device_path is None:
print(f"Устройство с серийным номером {serial_number} не найдено, используем первое доступное")
device_path = devices[0]['path']
# Open the device using its path
dev.open_path(device_path)
return dev
except Exception as e:
# просто пытаемся вернуть первый попавшийся вариант
return hid.Device(OD_VID, OD_IOT_PID)
def process_data_report(data):
if len(data) >= 2:
value = int.from_bytes(data[:2], byteorder='little', signed=False)
return value / 100.0
return None
def process_custom_command(cmd, payload):
print(f"Получена нестандартная команда {cmd} с полезными данными: {payload.hex()}")
def sensor_state_changed(state):
"""
Обработка изменения состояния сенсора (HID_EVENT_REPORT_ID).
"""
print(f"Изменение состояния сенсора: {state}")
def main():
# Поиск доступных устройств
devices = available_devices()
if not devices:
print("Устройства не найдены")
sys.exit(0)
device_type, serial = devices[0]
print(f"\nОткрываем устройство: {device_type} ({serial})")
dev = open_device(serial)
try:
last_data_time = time.time() # Время последнего полученного отчета
timeout_period = 10 # Таймаут в секундах
while True:
# Пытаемся прочитать отчет (до 64 байт)
try:
report = dev.read(64, timeout=100) # либо timeout_ms в зависимости от библиотеки
if report:
report = bytes(report)
report_id = report[0]
last_data_time = time.time() # Обновляем время последних данных
if report_id == HID_DATA_REPORT_ID:
# Отчет с данными: остальные байты содержат полезную нагрузку
data_value = process_data_report(report[1:])
print(f"Температура: {data_value}")
elif report_id == HID_EVENT_REPORT_ID:
# Отчет об изменении состояния: второй байт – код состояния
state_value = report[1]
sensor_state_changed(state_value)
elif report_id == HID_FW_REPORT_ID:
# Отчет с версией прошивки:
# Второй байт содержит длину строки, начиная с третьего байта – сама строка
length = report[1]
firmware_version = report[2:2+length].decode('latin1')
print(f"[FW] Версия прошивки: {firmware_version}")
elif report_id == HID_CMD_REPORT_ID:
process_custom_command(report[1], report[2:])
else:
print(f"Неизвестный report id: {report_id}")
# Проверяем таймаут по времени с момента последних данных
if time.time() - last_data_time > timeout_period:
print(f"Устройство не отвечает более {timeout_period} секунд")
break
except IOError as e:
print(f"Ошибка чтения: {e}")
# Не выходим при ошибке чтения, продолжаем попытки
time.sleep(0.5)
except KeyboardInterrupt:
print("Прерывание пользователем")
finally:
dev.close()
print("Устройство закрыто")
if __name__ == '__main__':
main()