Hello
Welcome to my corner on the web! My name is Ignas Bukys and I am excited to share my thoughts, experiences, and insights with you.
On this blog, you will find a diverse range of topics including technology and programming. All my projects (ongoing & finished) are listed here.
Whether you are here for personal growth, to learn something new, or simply to be entertained, I hope you find value in what I have to share. Thank you for stopping by and I hope you enjoy your visit.
Latest Blog messages
uTuya- access Tuya devices from ESP32 MCU's
Controlling Tuya Smart Plugs Locally with MicroPython on ESP32
No cloud. No dependencies. Pure local LAN control.
I spent a few days reverse-engineering the Tuya local LAN protocol to get my AUBESS Smart Socket 20A plugs talking to an ESP32 running MicroPython — without relying on the cloud, without any third-party MicroPython libraries. This post documents everything I found, including the quirks that cost me the most time.
Why Local Control?
Tuya's cloud API is rate-limited, requires internet, and adds ~500ms latency. For home automation on a microcontroller you want local LAN control: direct TCP on port 6668, sub-10ms response and solution that works offline.
There are Python libraries for this on desktop (tinytuya), but nothing clean for MicroPython. This post fills that gap.
What You Need
- ESP32 board running MicroPython
- Tuya-based smart plug (tested: AUBESS Smart Socket 20A)
- Your plug's Device ID and Local Key (see below)
- The plug's local IP address
Step 1 — Set-up developer account at Tuya Cloud
I assume, you already have Tuya app installed on your phone :) Its the only way to get device's Local Key- developer account is needed at Tuya platform. Free trial account is totally suitable, no need to pay.
Step 2 — Get Device Credentials
You need the Device ID (20 chars) and Local Key (16 chars) from Tuya's developer portal.
On your PC:
pip install tinytuya python -m tinytuya wizard
This walks you through linking your Tuya developer account and dumps all device credentials to devices.json and other JSON files. Dont forget to check those.
⚠️ The Local Key changes every time you remove and re-add a device in the Smart Life app. Don't do that after grabbing the key.
Step 3 — Understand the Protocol
Tuya v3.3 uses plain TCP on port 6668 with AES-128-ECB encrypted JSON payloads. Each frame looks like:
[HEADER 4B][SEQUENCE 4B][COMMAND 4B][LENGHt 4B][AES'ed PAYLOAD][FOOTER 4B]
- Header:
00 00 55 AA - Sequence:
00 00 00 01 - Command:
00 00 00 0a - Lenght (hex):
00 00 00 78 - Payload:
- Footer:
00 00 AA 55
Lenght is measured PAYLOAD + FOOTER
All my AUBESS plugs are query-style - after connecting to socket Plug waits. You must send a status query packet first.
Step 4 — Another Quirk: The Version Prefix
Standard Tuya v3.3 documentation says the encrypted payload should be prefixed with 3.3\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 (15 bytes). This makes packets 151 bytes. It is required when sending set_switch command
After capturing tinytuya traffic I discovered my plugs want no prefix for the status query — 136-byte packets only. The prefix is only used for control packets (turning on/off).
This single detail isn't documented anywhere I could find.
| Packet type | Version prefix | Packet size |
Status query (cmd 0x0a) | ❌ No prefix | 136 bytes |
Heart-beat (cmd 0x09) | ❌ No prefix | 136 bytes |
Control (cmd 0x07) | ✅ With prefix | 151 bytes |
Step 5 — Fix the Timestamp
MicroPython's utime.time() epoch starts at 2000-01-01, not 1970-01-01 like Unix.
Tuya validates the t field in the payload and rejects queries where the timestamp is too far from real Unix time.
Fix this by syncing NTP on boot and adding the epoch offset:
- boot.py
import network, utime, ntptime sta = network.WLAN(network.STA_IF) sta.active(True) sta.connect('YOUR_SSID', 'YOUR_PASSWORD') while not sta.isconnected(): utime.sleep_ms(300) ntptime.settime() # sync to real UTC
# In your main script EPOCH_OFFSET = 946684800 # seconds between 1970 and 2000 def unix_now(): return utime.time() + EPOCH_OFFSET
DPS Data Explained
The plug returns a dps dictionary. For AUBESS Smart Socket 20A:
| Key | Unit | Description |
| 1 | bool | Switch state |
| 9 | — | Countdown timer |
| 18 | mA | Current |
| 19 | W × 10 | Power (divide by 10) |
| 20 | V × 10 | Voltage (divide by 10) |
| 21 | — | Power-on mode |
| 22–24 | — | Calibration values |
| 25 | mA | Leakage current threshold |
Mapping could be found in files, generated by tinytuya wizard
Debugging Tips
- ECONNRESET immediately — plug rejects the packet content. Check key order in JSON and version prefix.
- ETIMEDOUT — plug ignores the packet entirely. Check timestamp (NTP sync), and whether you're using prefix or no-prefix for the right packet type.
data format errorin decrypted response — key is correct but payload format is wrong. Check JSON structure.- Only one connection at a time — close the Smart Life app on your phone before connecting from ESP32.
This monkey-patch script helped a lot. Using it i was able to prepare Microptyhon library for Tuya devices
- capture.py
# Run this on PC: python capture.py import tinytuya import socket import threading import time # Pinguin DEVICE_IP = '192.168.1.55' DEVICE_ID = '1<hidden>9' LOCAL_KEY = '5<hidden>b' # Monkey-patch socket to capture raw bytes _orig_connect = socket.socket.connect _orig_send = socket.socket.send _orig_recv = socket.socket.recv _orig_sendall = socket.socket.sendall def patched_connect(self, addr): print('[SOCKET] connect to', addr) return _orig_connect(self, addr) def patched_send(self, data, *args): print('[SOCKET] send %d bytes:' % len(data), ' '.join('%02x' % b for b in data)) return _orig_send(self, data, *args) def patched_recv(self, size, *args): data = _orig_recv(self, size, *args) print('[SOCKET] recv %d bytes:' % len(data), ' '.join('%02x' % b for b in data)) return data def patched_sendall(self, data, *args): print('[SOCKET] sendall %d bytes:' % len(data), ' '.join('%02x' % b for b in data)) return _orig_sendall(self, data, *args) socket.socket.connect = patched_connect socket.socket.send = patched_send socket.socket.recv = patched_recv socket.socket.sendall = patched_sendall d = tinytuya.OutletDevice( dev_id=DEVICE_ID, address=DEVICE_IP, local_key=LOCAL_KEY, version=3.3 ) d.set_socketTimeout(10) data = d.status() print('STATUS:', data)
What Didn't Work (Save Yourself the Time)
- Sending heartbeat (
0x09) before the status query — some plugs need it, mine didn't utime.time()without NTP sync — plug rejects stale timestamps- Version prefix on status query — adds 15 bytes that this plug firmware rejects
Final Notes
This was tested on AUBESS Smart Socket 20A running Tuya firmware v3.3. Other Tuya devices may behave differently — some push status on connect without needing a query, some require a heartbeat first. The packet capture approach (monkey-patching Python's socket in tinytuya) is the most reliable way to debug unknown devices.
All code runs on standard MicroPython ESP32 builds with no additional packages. The only modules used are socket, time, struct, cryptolib, binascii, errno and json— all built-in.
The Complete Script
- uTuya.py
""" uTuya - MicroPython Library for Local Tuya Device Control ========================================================== A lightweight MicroPython library for controlling Tuya-compatible smart devices (smart plugs, switches, lights, etc.) over local network without cloud dependency. Author: Ignas Bukys ignas.bukys [] gmail [] com License: MIT Version: 1.0 @ 2026-05-13 Features: --------- - Local network communication (no cloud required) - AES encryption/decryption for secure device communication - Support for Tuya protocol v3.3 - Energy monitoring (voltage, current, power) for compatible devices - DPS (Data Point) read/write operations - Heartbeat support to keep connections alive - Configurable debug logging - Optional connection pooling for frequent queries - Memory-optimized with memoryview to reduce allocations Hardware Requirements: --------------------- - ESP32, ESP8266, or other MicroPython-compatible microcontroller - Tuya-compatible smart device on the same local network Dependencies: ------------ - MicroPython standard library (socket, time, struct, json) - cryptolib (MicroPython AES encryption module) Usage Example: ------------- from tuya import uTuya # Initialize device connection device = uTuya( ip='192.168.1.100', dev='your_device_id', key='your_local_key', *version=3.3, *debug=True ) * Optional # Turn switch ON device.turn_on() device.set_dps(1, True) # Turn switch OFF device.turn_off() device.set_dps(1, False) # Get energy monitoring data print(f"Voltage: {device.voltage} V") print(f"Current: {device.current} A") print(f"Power: {device.power} W") # Check switch state if device.is_on: print("Device is ON") # Get all DPS values dps = device.get_dps() print(dps) # Get specific DPS value switch_state = device.get_dps(1) # Set custom DPS value device.set_dps(1, True) # Turn on # Print formatted status device.print_status() # Send heartbeat device.heartbeat() # Use keep_alive mode for multiple operations (automatic cleanup) device_persistent = uTuya( ip='192.168.1.100', dev='your_device_id', key='your_local_key', keep_alive=True # Reuse socket connection ) # Multiple operations reuse the same socket device_persistent.set_switch(True) print(device_persistent.voltage) device_persistent.heartbeat() # Close persistent connection when done device_persistent.close() How to Get Device Credentials: ------------------------------ 1. Device ID and Local Key: Read on http://bukys.eu blog post how to get Extract using tool tinytuya 2. IP Address: Check your router's DHCP table or use network scanner Protocol Notes: -------------- - Query packets (CMD_QUERY): NO version prefix in payload encryption - Control packets (CMD_CONTROL): REQUIRES version prefix (e.g., '3.3') - All packets use: HEADER + BODY + CRC32 + FOOTER structure - Responses are AES-encrypted and may include version prefix Memory Optimization Notes: -------------------------- - memoryview is used to avoid unnecessary data copying - Slicing memoryview creates a view, not a copy (zero-copy operations) - Must convert memoryview to bytes when passing to functions that need bytes - Pre-allocated buffers for CRC32 calculation reduce GC pressure - In-place operations where possible to minimize allocations MIT License: ----------- Copyright (c) 2025 Ignas Bukys Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import socket, errno, time, binascii, struct import cryptolib # pyright: ignore[reportMissingImports] import json from micropython import const # pyright: ignore[reportMissingImports] # ── Socket Management ──────────────────────────────────────────────── class _TuyaSocket: """ Context manager for safe socket handling with automatic cleanup. Now handles the opening, connecting, and logic for persistent sockets. """ def __init__(self, parent): """ Initialize socket context manager. Args: parent (uTuya): Parent uTuya instance that provides IP, PORT, and stores the persistent socket state. """ self.parent = parent self.sock = None self.is_reused = False def _sock_open(self): """ Private helper to create and connect a new TCP socket. Returns socket object or None on failure. """ self.parent._log("Creating new socket object") sck = socket.socket() sck.settimeout(3) # Standard timeout for Tuya handshake try: self.parent._log(f"Connecting to {self.parent.ip}:{self.parent.PORT}") sck.connect((self.parent.ip, self.parent.PORT)) self.parent._log("Connected successfully") return sck except socket.error as e: # Handle common network errors with descriptive logs if e.errno == errno.EHOSTUNREACH: self.parent._log(f"Error: Host {self.parent.ip} unreachable.") elif e.errno == errno.ECONNREFUSED: self.parent._log(f"Error: Connection refused on port {self.parent.PORT}.") elif e.errno == errno.ECONNRESET: self.parent._log(f"Error: Connection reset by peer.") else: self.parent._log(f"Socket error: {e}") return None except Exception as e: self.parent._log(f"Unexpected error opening socket: {e}") return None def __enter__(self): """ Enter context: open or reuse socket connection. Returns: socket: Active socket connection Raises: OSError: If connection fails """ # 1. Try to reuse a persistent socket if keep_alive is active if self.parent.keep_alive and self.parent._socket: self.parent._log("Reusing persistent socket") self.sock = self.parent._socket self.is_reused = True return self.sock # Otherwise, open a fresh connection self.sock = self._sock_open() if self.sock is None: raise OSError("Failed to connect to device") # If keep_alive is enabled, store this socket for future reuse if self.parent.keep_alive: self.parent._socket = self.sock self.parent._log("Socket stored for keep_alive mode") self.is_reused = False return self.sock def __exit__(self, exc_type, exc_val, exc_tb): """ Exit context: close socket only if NOT in keep_alive mode or if error occurred. Args: exc_type: Exception type (if any) exc_val: Exception value (if any) exc_tb: Exception traceback (if any) """ # If an exception occurred, close and clear the socket even in keep_alive mode if exc_type is not None: self.parent._log(f"Exception in socket context: {exc_val}") if self.sock: try: self.sock.close() except: pass # Clear the persistent socket reference so next call opens fresh self.parent._socket = None return False # If NOT in keep_alive mode, close the socket now if not self.parent.keep_alive and self.sock: self.parent._log("Closing short-lived socket (not in keep_alive mode)") try: self.sock.close() except: pass else: self.parent._log("Socket kept open (keep_alive mode)") return False class uTuya: """ Main class for Tuya device communication over local network. Handles protocol encryption, packet crafting, and device state management. Memory-optimized with memoryview for reduced allocations. """ PORT = const(6668) # Standard Tuya device port HEADER = const(b'\x00\x00\x55\xaa') # Packet header signature FOOTER = const(b'\x00\x00\xaa\x55') # Packet footer signature CMD_HEARTBEAT = const(0x09) # Keep-alive command as heartbeay CMD_QUERY = const(0x0a) # Read device state command CMD_CONTROL = const(0x07) # Write/control command # Unix epoch (1970-01-01) vs MicroPython epoch (2000-01-01) offset in seconds # Used to convert MicroPython time.time() to Unix timestamp for Tuya protocol EPOCH_DIFF = const(946684800) def __init__(self, ip, dev, key, version=3.3, debug=False, keep_alive=False): """ Initialize Tuya device connection. Args: ip (str): Device IP address on local network dev (str): Tuya device ID (gwId/devId) key (str|bytes): AES encryption key for device version (float|str): Protocol version (default '3.3') debug (bool): Enable debug logging (default False) keep_alive (bool): Maintain persistent socket connection (default False) """ self.ip = ip self.did = dev # Ensure local_key is bytes for cryptolib self.key = key.encode() if isinstance(key, str) else key self.version = str(version) self.sequence = 0 # Packet sequence counter (incremented per request) self.debug = debug # Debug logging flag self.keep_alive = keep_alive self._socket = None # Persistent socket for keep_alive mode try: _ = binascii.crc32 self._crc32_available = True except (AttributeError, ImportError): self._crc32_available = True # ── Cryptography ──────────────────────────────────────────────────────── def _crc32(self, data): """ Calculate CRC32 checksum for packet integrity verification. Memory-optimized: Uses memoryview to avoid copying data when iterating. Uses polynomial 0xEDB88320 (reversed 0x04C11DB7) as per Tuya protocol. This is the same CRC32 algorithm used in ZIP, Ethernet, PNG, etc. Args: data (bytes): Data to calculate checksum for Returns: int: 32-bit CRC checksum """ # use the firmware's built-in CRC32 (written in C- faster) if self._crc32_available: return binascii.crc32(data) # use pure python implementation crc = 0xFFFFFFFF # Use memoryview to iterate without copying - saves memory mv = memoryview(data) for byte in mv: crc ^= byte for _ in range(8): if crc & 1: crc = (crc >> 1) ^ 0xEDB88320 else: crc >>= 1 return crc ^ 0xFFFFFFFF def _aes_encrypt(self, data): """ Encrypt data using AES-128-ECB with PKCS7 padding. Tuya uses ECB mode (not CBC) for simplicity, though less secure. PKCS7 padding ensures data length is multiple of 16 bytes. Args: data (str|bytes): Plaintext to encrypt Returns: bytes: Encrypted ciphertext """ if isinstance(data, str): data = data.encode() # PKCS7 padding: add N bytes of value N to reach 16-byte boundary pad = 16 - (len(data) % 16) data += bytes([pad] * pad) # cryptolib mode 1 = ECB encryption return cryptolib.aes(self.key, 1).encrypt(data) def _aes_decrypt(self, data): """ Decrypt AES-128-ECB encrypted data and remove PKCS7 padding. Args: data (bytes): Encrypted ciphertext Returns: bytes: Decrypted plaintext with padding removed """ dec = cryptolib.aes(self.key, 1).decrypt(data) # Remove PKCS7 padding (last byte indicates pad length) pad = dec[-1] if pad <= 16: # Valid padding must be 1-16 dec = dec[:-pad] return dec # ── Protocol ────────────────────────────────────────────────────────── def _craft_packet(self, cmd, payload_bytes): """ Construct a complete Tuya protocol packet. Packet structure: - HEADER (4 bytes): 0x000055aa - Sequence (4 bytes): Incrementing packet counter - Command (4 bytes): Command code (0x07, 0x09, 0x0a, etc.) - Length (4 bytes): Payload length + 8 (includes return code + payload) - Payload (variable): Encrypted data - CRC32 (4 bytes): Checksum of HEADER + body - FOOTER (4 bytes): 0x0000aa55 Args: cmd (int): Command code (CMD_QUERY, CMD_CONTROL, etc.) payload_bytes (bytes): Encrypted payload data Returns: bytes: Complete packet ready to send """ self.sequence += 1 # Increment for each new request # Length includes 8-byte overhead (4-byte return code + 4-byte CRC in some contexts) length = len(payload_bytes) + 8 # Body: sequence + command + length + payload body = struct.pack('>III', self.sequence, cmd, length) + payload_bytes # Calculate CRC over header + body crc = self._crc32(self.HEADER + body) & 0xFFFFFFFF packet = self.HEADER + body + struct.pack('>I', crc) + self.FOOTER self._log(f"Crafted packet: seq={self.sequence}, cmd=0x{cmd:02x}, len={length}, total={len(packet)} bytes") return packet def _encrypt_query(self, data, version_incl=False): """ Encrypt payload with optional version prefix. CRITICAL PROTOCOL DIFFERENCE: - Query packets (CMD_QUERY): NO version prefix (version_incl=False) - Control packets (CMD_CONTROL): REQUIRE version prefix (version_incl=True) Version prefix format: '3.3' + 12 null bytes + encrypted_data Args: data (str|bytes): JSON payload to encrypt version_incl (bool): Whether to prepend version header Returns: bytes: Encrypted payload (with version prefix if requested) """ self._log(f"Encrypting payload ({len(data)} bytes, version_prefix={version_incl})") enc = self._aes_encrypt(data) if version_incl: # Format: '3.3\x00\x00...\x00' (version string + 12 nulls) + encrypted # result = b'{%s}' % self.version.encode() + b'\x00' * 12 + enc result = b'%s' % self.version.encode() + b'\x00' * 12 + enc self._log(f"Added version prefix: total {len(result)} bytes") return result self._log(f"No version prefix: {len(enc)} bytes") return enc def _decrypt_response(self, resp): """ Decrypt and parse device response packet. MEMORY-OPTIMIZED: Uses memoryview to slice without copying data. Response may have various prefixes before the actual JSON: - 4-byte null prefix (0x00000000) - Version prefix ('3.3' + 12 nulls) Args: resp (bytes): Raw response packet from device Returns: dict|None: Parsed JSON data or None if decryption failed """ # ── Validation ── if not resp or len(resp) < 16: self._log("Response too short") return None # Create memoryview for zero-copy slicing mv = memoryview(resp) # Verify packet structure (header at start, footer at end) # Must convert memoryview slices to bytes for comparison if bytes(mv[:4]) != self.HEADER or bytes(mv[-4:]) != self.FOOTER: self._log("Invalid header/footer in response") return None # ── Extract Payload ── # Length field at offset 12 includes 8-byte overhead length = struct.unpack('>I', mv[12:16])[0] # Extract payload using memoryview (zero-copy) pl_view = mv[16:16 + length - 8] # ── Strip Prefixes ── # Some responses have 4-byte null prefix (0x00000000) if len(pl_view) >= 4 and bytes(pl_view[:2]) == b'\x00\x00': pl_view = pl_view[4:] # Strip version prefix if present ('3.3' + 12 nulls = 15 bytes) if pl_view[:3] == b'3.3': pl_view = pl_view[15:] # ── Decrypt ── # Payload must be multiple of 16 (AES block size) if not pl_view or len(pl_view) % 16 != 0: self._log(f"Invalid payload length: {len(pl_view)}") return None # Convert memoryview to bytes for decryption (cryptolib needs bytes) payload = bytes(pl_view) try: dec = self._aes_decrypt(payload) # Find JSON boundaries in decrypted data # (sometimes has garbage before/after the JSON) i = dec.find(b'{') j = dec.rfind(b'}') + 1 if i >= 0 and j > i: return json.loads(dec[i:j]) except Exception as e: self._log(f"Decryption error: {e}") return None def _unused__sock_open(self): """ Open TCP socket connection to device with timeout. Returns: socket|None: Connected socket or None on failure """ sck = socket.socket() sck.settimeout(8) # 8-second timeout for connect/send/recv try: self._log(f"Connecting to {self.ip}:{self.PORT}") sck.connect((self.ip, self.PORT)) self._log("Connected successfully") return sck except socket.error as e: # Provide helpful error messages for common socket errors if e.errno == errno.EHOSTUNREACH: self._log(f"Error EHOSTUNREACH: Host {self.ip} is unreachable. Check your network or IP address.") elif e.errno == errno.ECONNREFUSED: self._log(f"Error ECONNREFUSED: Connection refused on {self.ip}:{self.PORT} (Service down/Wrong port).") elif e.errno == errno.ECONNRESET: self._log(f"Error ECONNRESET: Connection reset by {self.ip} (Remote host dropped the connection).") else: self._log(f"ERROR: Socket open error: {e}") return None except Exception as e: self._log(f"Unexpected error opening socket: {e}") return None def _unused__get_socket(self): """ Get socket connection (reuse if keep_alive, otherwise open new). Returns: socket|None: Active socket connection or None """ if self.keep_alive and self._socket: self._log("Reusing persistent socket") return self._socket # Open new connection sock = self._sock_open() if self.keep_alive and sock: # Store for future reuse self._socket = sock return sock def _unused__close(self): """ Close persistent socket connection. Call this when done with device if keep_alive=True to free resources. """ if self._socket: self._log("Closing persistent socket") self._socket.close() self._socket = None # ── Helpers ──────────────────────────────────────────────────────── def _unix_now(self): """ Get current Unix timestamp (seconds since 1970-01-01). MicroPython's time.time() returns seconds since 2000-01-01, so we add EPOCH_DIFF to convert to Unix time. Returns: int: Current Unix timestamp """ return int(time.time() + self.EPOCH_DIFF) def _log(self, msg): """ Print debug log message if debug mode enabled. Args: msg (str): Message to log """ if self.debug: print(f"[uTuya] {msg}") def _is_ack(self, raw): """ Checks if a raw response packet is a successful ACK from the device. Args: raw (bytes): The raw data received from the socket. Returns: bool: True if this appears to be a valid ACK """ # Use memoryview for zero-copy header check mv = memoryview(raw) # Minimum valid packet size for an ACK is 28 bytes: # Header(4) + Seq(4) + Cmd(4) + Len(4) + RetCode(4) + CRC(4) + Footer(4) if not raw or len(raw) < 28: self._log("ACK Check: Response too short or empty") return False # 1. Verify Packet Integrity (Header and Footer) if bytes(mv[:4]) != self.HEADER or bytes(mv[-4:]) != self.FOOTER: self._log("ACK Check: Invalid Header or Footer") return False try: # 2. Extract Return Code # In the Tuya protocol, the Return Code is the first 4 bytes of the # payload area, which starts at index 16. ret_code = struct.unpack('>I', bytes(mv[16:20]))[0] if ret_code == 0: self._log("ACK Check: Success (Return Code 0)") return True else: self._log(f"ACK Check: Device returned error code {ret_code}") return False except Exception as e: self._log(f"ACK Check: Parsing error: {e}") return False # ── Public API ──────────────────────────────────────────────────────── def get_data_raw(self): """ Query device for current state (all DPS values). Sends CMD_QUERY command with device ID and timestamp. Does NOT include version prefix for query packets. Returns: dict|None: Response containing 'dps' dict with all data points, or None on error Example response: { 'dps': { '1': True, # Switch state '18': 2340, # Current (mA) '19': 523, # Power (W*10) '20': 2301 # Voltage (V*10) } } """ # Construct JSON payload with device ID and timestamp t = str(self._unix_now()) payload = ('{"gwId":"%s","devId":"%s","uid":"%s","t":"%s"}' % (self.did, self.did, self.did, t)) # Encrypt WITHOUT version prefix (critical for query packets) enc = self._encrypt_query(payload, version_incl=False) pkt = self._craft_packet(self.CMD_QUERY, enc) self._log(f"Sending QUERY command (seq={self.sequence})") try: # Context manager ensures socket is always closed properly with _TuyaSocket(self) as s: self._log(f"Request ({len(pkt)} bytes): {pkt.hex()}") s.send(pkt) raw = s.recv(512) # Max response size self._log(f"Response ({len(raw)} bytes): {raw.hex()}") return self._decrypt_response(raw) except OSError as e: print(f"Connection error in get_data_raw: {e}") return None except Exception as e: print(f'get_data_raw error: {e}') return None def get_dps(self, key=None): """ Get DPS (Data Point) values from device. Args: key (int|str|None): Specific DPS key to retrieve, or None for all Returns: dict: All DPS values if key=None any: Specific DPS value if key provided None: On error or if key not found Example: device.get_dps() # {'1': True, '18': 2340, '19': 523, '20': 2301} device.get_dps(1) # True device.get_dps('20') # 2301 """ data = self.get_data_raw() if not data: return None dps = data.get('dps', {}) # Return specific key if requested, otherwise return all DPS return dps.get(str(key)) if key is not None else dps def set_dps(self, key, value): """ Set a specific DPS (Data Point) value on device. Generic setter for any DPS key/value pair. Useful for advanced control beyond simple switch on/off (e.g., brightness, color, timer, etc.) Note: Some devices send TWO responses: 1. ACK packet (command accepted) 2. Status packet (updated DPS values) Args: key (int|str): DPS key to set value (any): Value to set (bool, int, str, etc.) Returns: dict|None: Response from device or None on error Example: device.set_dps(1, True) # Turn switch on device.set_dps(2, 50) # Set brightness to 50% device.set_dps(3, "white") # Set color mode """ t = str(self._unix_now()) # Convert value to JSON string if not already a string # (booleans need lowercase 'true'/'false', not Python's 'True'/'False') val_str = json.dumps(value) if not isinstance(value, str) else value.lower() payload = ('{"gwId":"%s","devId":"%s","uid":"%s","t":"%s","dps":{"%s":%s}}' % (self.did, self.did, self.did, t, key, val_str)) # Control commands REQUIRE version prefix (critical!) enc = self._encrypt_query(payload, version_incl=True) pkt = self._craft_packet(self.CMD_CONTROL, enc) self._log(f"Sending CONTROL (seq={self.sequence}): DPS {key}={value}") try: # Context manager ensures socket is always closed properly with _TuyaSocket(self) as s: self._log(f"Request ({len(pkt)} bytes): {pkt.hex()}") s.send(pkt) # Receive the first response (The ACK is expected) raw_ack = s.recv(512) self._log(f"ACK response ({len(raw_ack)} bytes): {raw_ack.hex()}") if self._is_ack(raw_ack): self._log("Command accepted by device.") # Receive actual response (expected status update) raw = s.recv(512) self._log(f"Response ({len(raw)} bytes): {raw.hex()}") return self._decrypt_response(raw) self._log("Command failed or timed out.") return None except OSError as e: print(f"Connection error in set_dps: {e}") return None except Exception as e: print(f'set_dps error: {e}') return None def heartbeat(self): """ Send heartbeat packet to device to keep connection alive. Some devices may drop idle connections. Send periodic heartbeats to maintain persistent connections. Returns: bool: True if heartbeat acknowledged, False on error """ # Heartbeat has empty payload pkt = self._craft_packet(self.CMD_HEARTBEAT, b'') self._log(f"Sending HEARTBEAT (seq={self.sequence})") try: # Context manager ensures socket is always closed properly with _TuyaSocket(self) as s: self._log(f"Heatbeat request ({len(pkt)} bytes): {pkt.hex()}") s.send(pkt) raw = s.recv(512) self._log(f"Heartbeat ACK response ({len(raw)} bytes): {raw.hex()}") return raw is not None except OSError as e: print(f"Connection error in heartbeat: {e}") return False except Exception as e: print(f'heartbeat error: {e}') return False def turn_on(self): """Convenience method to turn device ON""" return self.set_dps(1, True) def turn_off(self): """Convenience method to turn device OFF""" return self.set_dps(1, False) # ── Convenience Properties ──────────────────────────────────────────── # NOTE: Properties like voltage() call self.get_dps(), which calls self.get_data_raw(). # If you check voltage, current, and power sequentially, you are performing # three separate network requests. For efficiency, call get_dps() once and # calculate all values from the returned dict. @property def voltage(self): """ Get current voltage in Volts. Returns: float|None: Voltage in V (e.g., 230.1) or None on error """ dps = self.get_dps() return dps.get('20', 0) / 10 if dps else None @property def current(self): """ Get current draw in Amperes Returns: float|None: Current in A (e.g., 2.34) or None on error """ dps = self.get_dps() return dps.get('18', 0) / 1000 if dps else None @property def power(self): """ Get power consumption in Watts. Returns: float|None: Power in W (e.g., 52.3) or None on error """ dps = self.get_dps() return dps.get('19', 0) / 10 if dps else None @property def is_on(self): """ Check if device switch is currently ON. Returns: bool|None: True if ON, False if OFF, None on error """ dps = self.get_dps() return dps.get('1', False) if dps else None # ── Utility Methods ──────────────────────────────────────────────── def print_status(self): """ Print formatted device status to console. Displays switch state, voltage, current, power, and raw DPS values. Useful for debugging and monitoring. """ dps = self.get_dps() print('-' * 40) print('Device Status:') print('-' * 40) print(f' Switch : {dps.get("1", "N/A")}') print(f' Voltage: {dps.get("20", 0) / 10:.1f} V') print(f' Current: {dps.get("18", 0) / 1000:.3f} A') print(f' Power : {dps.get("19", 0) / 10:.1f} W') print('-' * 40) print(f' Raw DPS: {dps}') print('-' * 40)
Geoid CC600 bicycle computer review
I like to try various gadgets for my bike. Some enhance my side, some are just, well, meh… :)
With a new bike computer, I have logged more than 1000 kilometers this season, and I’m eager to give my opinion, based on my personal experience. The gadget I’m talking about is the Geoid CC600. The Geoid CC600 uses a ESP32 chip which serves as the CPU. This allows for a feature set that, for its price point, is very impressive.
- 2.4' color screen with 3 physical buttons
- Backlight screen with ambient light sensor
- IPX7 waterproof level, but I would not keep it on rain
- GPS, Galileo, Beidou, Glonass and QZSS positioning systems
- Type-C standard charging port
- 17-24 hours battery life, based on settings and usage scenario
- Wifi, Bluetooth and ANT+ wireless protocols
- 100 hours of storage
- Supported sensors: Speed*, Cadence*, Heartrate* Power meter, Smart trainer, Electronic shifting, Tail light, Radar*
* - Tested: I own XOSS Vortex Speed and Cadence sensors (works in both BT and ANT+), Garmin Heartrate sensor and W100 Radar.
Thing that stood out to me about the device was the display. It is a bright, colored screen that is easy to navigate through even in direct sunlight, which is something I have struggled with in the past. The screen’s brightness coupled with the resolution means that all the essential metrics and even navigation details are squinted in.
I have to say that the CC600's capability to interface with ANT+ and Bluetooth Low Energy (BLE) is a given advantage. I have seamlessly connected it with my heart rate monitor, speed sensor, and cadence sensor, and all of the connections have remained consistent with no surprise disconnections during my rides. Also, the unit has quite a number of data fields which gives me the possibility to customize my display pages to show the information that I want to see at a glance.
The navigation feature is a breadcrumb style which is good for following a set route. I have used it to follow GPX files that I have uploaded from my phone and it has worked unbelievably well. While it lacks the detailed, full-map view of more expensive devices, its simple outline track and turn-by-turn prompts are adequate to keep the user on track. This is a no-frills navigation system that gets the job done quite well. One more thing that I like is that the navigation track is shown in color, which is good for visibility. Also, worth to mention, that routes created in OneLapFit application has additional information on map about sideroads, which will help you to orient where to make your turn. If you drive away from your route, your route will be rerouted if OneLapFit application is open on your phone
After 1000 kilometers, the Geoid CC600 has proven to be a reliable and capable bike computer. It's not perfect—the companion app can be a little clunky at times, and the setup was a bit cryptic initially. However, the hardware itself is solid, and for its price, it offers a great balance of features, especially for someone who primarily wants a device for tracking stats and following simple routes. If you're in the market for a budget-friendly bike computer with a vibrant display and essential features, the Geoid CC600 is definitely worth considering.
ZBT WE826: My experience in choosing and configuring a 4G router
In this post, I want to share my personal experience in choosing and configuring a 4G router for my home. Bigger project I was on, required me to install a network solution in a house, and the choice was determined by the ratio of the offered functionality and price.
Choice
Out of many choices, the ZBT WE826 model caught my eye. This router had an attractive price and seemed suitable for my needs. With a 4G module, the router costs about €60. I found information that it supports OpenWRT software. I purchased from the Aliexpress platform and received the package.
Challenges
However, after starting to use this router, problems arose. The 4G connection worked only for 3-8 hours (very randomly), and then disappeared. I wanted to make sure that the device was working properly and fairly stably, so at first I didn't change any settings and tried to use the one I received. I tried to reconfigure the device according to the seller's notes and advice, but it didn't help. The seller shared the firmware for the router based on the LEDE Reboot version from 2018. I decided to install it, but it wasn't successful either - the device worked for 8 minutes and rebooted. Apparently some script was working.
Solutions
Finally, I decided to install the latest OpenWRT firmware version specifically for my device. I chose the “ZBT W826 T 16M factory image” version. This solution turned out to be correct and my device with the new firmware started up. As expected, the 4G modem did not work and needed to be configured. Two sources helped me do this:
You need to understand that in order to install some packages into router, you need an internet connection. I didn't had a wired connection, so I thought, maybe I can share the internet on my mobile and make the router a Wifi client. It was fairly straight forward. I've managed to enable 4G support by following documentation. Left router to hang overnight to evaluate stability. To my deep disappointment, the initial situation repeated itself - the 4G connection disappeared after a few hours.
Since I was using the latest version of OpenWRT, I decided to describe my problem on the OpenWRT forum. A forum member, guru, AndrewZ, responded quite quickly and asked for additional data from the router. After an initial analysis, he suggested changing the 4G module mode and reconfigure the router. I read the procedure and decided to go this route. I have nothing to lose, even if I break my router!
I reloaded the OpenWRT firmware once again to start from fresh, shared the Internet via my mobile phone. I changed the module configuration to support the QMI protocol using AT commands. I configured the router according to the documentation. This process was a challenge, but in the end, I've managed to achieve the desired result.
Result
Now I have router that gives me low level configuration, has wide support for various software and active community in case something goes wrong. My ZBT WE826 router works stably and without problems. The 4G connection is stable, and the Wi-Fi coverage is very good. In addition, the form factor of the router is perfect for hanging on the wall, which is a big advantage.
Happy note
This project showed that even with an inexpensive device, good results can be achieved if you are ready to invest some time and effort. The main thing is not to give up and look for solutions, even if everything seems complicated at first.












