Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions meshtastic/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
pskToString,
stripnl,
message_to_json,
generate_channel_hash,
to_node_num,
)

Expand Down Expand Up @@ -1018,3 +1019,20 @@ def ensureSessionKey(self):
nodeid = to_node_num(self.nodeNum)
if self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey") is None:
self.requestConfig(admin_pb2.AdminMessage.SESSIONKEY_CONFIG)

def get_channels_with_hash(self):
"""Return a list of dicts with channel info and hash."""
result = []
if self.channels:
for c in self.channels:
if c.settings and hasattr(c.settings, "name") and hasattr(c.settings, "psk"):
hash_val = generate_channel_hash(c.settings.name, c.settings.psk)
else:
hash_val = None
result.append({
"index": c.index,
"role": channel_pb2.Channel.Role.Name(c.role),
"name": c.settings.name if c.settings and hasattr(c.settings, "name") else "",
"hash": hash_val,
})
return result
45 changes: 45 additions & 0 deletions meshtastic/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@
from meshtastic.supported_device import SupportedDevice
from meshtastic.protobuf import mesh_pb2
from meshtastic.util import (
DEFAULT_KEY,
Timeout,
active_ports_on_supported_devices,
camel_to_snake,
catchAndIgnore,
channel_hash,
convert_mac_addr,
eliminate_duplicate_port,
findPorts,
fixme,
fromPSK,
fromStr,
generate_channel_hash,
genPSK256,
hexstr,
ipstr,
Expand Down Expand Up @@ -670,3 +673,45 @@ def test_shorthex():
assert result == b'\x05'
result = fromStr('0xffff')
assert result == b'\xff\xff'

def test_channel_hash_basics():
"Test the default key and LongFast with channel_hash"
assert channel_hash(DEFAULT_KEY) == 2
assert channel_hash("LongFast".encode("utf-8")) == 10

@given(st.text(min_size=1, max_size=12))
def test_channel_hash_fuzz(channel_name):
"Test channel_hash with fuzzed channel names, ensuring it produces single-byte values"
hashed = channel_hash(channel_name.encode("utf-8"))
assert 0 <= hashed <= 0xFF

def test_generate_channel_hash_basics():
"Test the default key and LongFast/MediumFast with generate_channel_hash"
assert generate_channel_hash("LongFast", "AQ==") == 8
assert generate_channel_hash("LongFast", bytes([1])) == 8
assert generate_channel_hash("LongFast", DEFAULT_KEY) == 8
assert generate_channel_hash("MediumFast", DEFAULT_KEY) == 31

@given(st.text(min_size=1, max_size=12))
def test_generate_channel_hash_fuzz_default_key(channel_name):
"Test generate_channel_hash with fuzzed channel names and the default key, ensuring it produces single-byte values"
hashed = generate_channel_hash(channel_name, DEFAULT_KEY)
assert 0 <= hashed <= 0xFF

@given(st.text(min_size=1, max_size=12), st.binary(min_size=1, max_size=1))
def test_generate_channel_hash_fuzz_simple(channel_name, key_bytes):
"Test generate_channel_hash with fuzzed channel names and one-byte keys, ensuring it produces single-byte values"
hashed = generate_channel_hash(channel_name, key_bytes)
assert 0 <= hashed <= 0xFF

@given(st.text(min_size=1, max_size=12), st.binary(min_size=16, max_size=16))
def test_generate_channel_hash_fuzz_aes128(channel_name, key_bytes):
"Test generate_channel_hash with fuzzed channel names and 128-bit keys, ensuring it produces single-byte values"
hashed = generate_channel_hash(channel_name, key_bytes)
assert 0 <= hashed <= 0xFF

@given(st.text(min_size=1, max_size=12), st.binary(min_size=32, max_size=32))
def test_generate_channel_hash_fuzz_aes256(channel_name, key_bytes):
"Test generate_channel_hash with fuzzed channel names and 256-bit keys, ensuring it produces single-byte values"
hashed = generate_channel_hash(channel_name, key_bytes)
assert 0 <= hashed <= 0xFF
26 changes: 26 additions & 0 deletions meshtastic/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

logger = logging.getLogger(__name__)

DEFAULT_KEY = base64.b64decode("1PG7OiApB1nwvP+rz05pAQ==".encode("utf-8"))

def quoteBooleans(a_string: str) -> str:
"""Quote booleans
given a string that contains ": true", replace with ": 'true'" (or false)
Expand Down Expand Up @@ -365,6 +367,30 @@ def remove_keys_from_dict(keys: Union[Tuple, List, Set], adict: Dict) -> Dict:
remove_keys_from_dict(keys, val)
return adict

def channel_hash(data: bytes) -> int:
"""Compute an XOR hash from bytes for channel evaluation."""
result = 0
for char in data:
result ^= char
return result

def generate_channel_hash(name: Union[str, bytes], key: Union[str, bytes]) -> int:
"""generate the channel number by hashing the channel name and psk (accepts str or bytes for both)"""
# Handle key as str or bytes
if isinstance(key, str):
key = base64.b64decode(key.replace("-", "+").replace("_", "/").encode("utf-8"))

if len(key) == 1:
key = DEFAULT_KEY[:-1] + key

# Handle name as str or bytes
if isinstance(name, str):
name = name.encode("utf-8")

h_name = channel_hash(name)
h_key = channel_hash(key)
result: int = h_name ^ h_key
return result

def hexstr(barray: bytes) -> str:
"""Print a string of hex digits"""
Expand Down
Loading