Msg Retry fixes, channel message fixes. Notification fixes. Make more desktop friendly. Enhance retry algo. Fix predicted location clustering add retries to reactions and fix the reactions in private DMS centralize and cleanup code in var areas

This commit is contained in:
zjs81
2026-03-20 01:54:31 -07:00
parent 53caec3e14
commit 4962a48e64
61 changed files with 4509 additions and 900 deletions
+372 -219
View File
@@ -171,6 +171,11 @@ class MeshCoreConnector extends ChangeNotifier {
// Intentionally global (not per-contact): tracks overall network activity.
// Frequent RX from any source indicates a busy network with more collisions.
DateTime _lastRxTime = DateTime.now();
DateTime _lastRadioRxTime = DateTime.fromMillisecondsSinceEpoch(0);
DateTime _lastContactMsgRxTime = DateTime.fromMillisecondsSinceEpoch(0);
static const int _radioQuietMs = 3000;
static const int _radioQuietMaxWaitMs = 3000;
static const int _contactMsgBackoffMs = 5000;
bool _batteryRequested = false;
bool _awaitingSelfInfo = false;
bool _hasReceivedDeviceInfo = false;
@@ -694,24 +699,32 @@ class MeshCoreConnector extends ChangeNotifier {
_loadChannelOrder();
// Initialize retry service callbacks
_retryService?.initialize(
sendMessageCallback: _sendMessageDirect,
addMessageCallback: _addMessage,
updateMessageCallback: _updateMessage,
clearContactPathCallback: clearContactPath,
setContactPathCallback: setContactPath,
calculateTimeoutCallback:
_retryService?.initialize(RetryServiceConfig(
sendMessage: _sendMessageDirect,
addMessage: _addMessage,
updateMessage: _updateMessage,
clearContactPath: clearContactPath,
setContactPath: setContactPath,
calculateTimeout:
(pathLength, messageBytes, {String? contactKey}) => calculateTimeout(
pathLength: pathLength,
messageBytes: messageBytes,
contactKey: contactKey,
),
getSelfPublicKeyCallback: () => _selfPublicKey,
prepareContactOutboundTextCallback: prepareContactOutboundText,
getSelfPublicKey: () => _selfPublicKey,
prepareContactOutboundText: prepareContactOutboundText,
appSettingsService: appSettingsService,
debugLogService: _appDebugLogService,
recordPathResultCallback: _recordPathResult,
onDeliveryObservedCallback:
recordPathResult: _recordPathResult,
selectRetryPath:
(contactKey, attemptIndex, maxRetries, recentSelections) =>
_selectAutoPathForAttempt(
contactKey,
attemptIndex: attemptIndex,
maxRetries: maxRetries,
recentSelections: recentSelections,
),
onDeliveryObserved:
(contactKey, pathLength, messageBytes, tripTimeMs) {
final secSinceRx = DateTime.now().difference(_lastRxTime).inSeconds;
_timeoutPredictionService?.recordObservation(
@@ -722,7 +735,9 @@ class MeshCoreConnector extends ChangeNotifier {
secondsSinceLastRx: secSinceRx,
);
},
);
));
final maxRetries = _appSettingsService?.settings.maxMessageRetries ?? 5;
_retryService?.setMaxRetries(maxRetries);
}
Future<void> loadContactCache() async {
@@ -753,22 +768,61 @@ class MeshCoreConnector extends ChangeNotifier {
}
}
void _sendMessageDirect(
Future<void> _waitForRadioQuiet() async {
// Wait for backoff after receiving a contact message (avoid collision
// with their transmission still propagating through repeaters)
final msSinceContactMsg = DateTime.now()
.difference(_lastContactMsgRxTime)
.inMilliseconds;
if (msSinceContactMsg < _contactMsgBackoffMs) {
final waitMs = _contactMsgBackoffMs - msSinceContactMsg;
debugPrint('Contact message backoff: waiting ${waitMs}ms');
await Future<void>.delayed(Duration(milliseconds: waitMs));
}
// Then wait for radio silence (no RF activity for 3s)
final msSinceRx = DateTime.now()
.difference(_lastRadioRxTime)
.inMilliseconds;
if (msSinceRx >= _radioQuietMs) return;
final deadline = DateTime.now().add(
const Duration(milliseconds: _radioQuietMaxWaitMs),
);
while (DateTime.now().isBefore(deadline)) {
final quiet = DateTime.now().difference(_lastRadioRxTime).inMilliseconds;
if (quiet >= _radioQuietMs) {
debugPrint('Radio quiet for ${quiet}ms, proceeding with send');
return;
}
await Future<void>.delayed(const Duration(milliseconds: 200));
}
debugPrint(
'Radio quiet wait exceeded ${_radioQuietMaxWaitMs}ms, sending anyway',
);
}
Future<void> _sendMessageDirect(
Contact contact,
String text,
int attempt,
int timestampSeconds,
) async {
if (!isConnected || text.isEmpty) return;
final outboundText = prepareContactOutboundText(contact, text);
await sendFrame(
buildSendTextMsgFrame(
contact.publicKey,
outboundText,
attempt: attempt,
timestampSeconds: timestampSeconds,
),
);
try {
await _waitForRadioQuiet();
final outboundText = prepareContactOutboundText(contact, text);
await sendFrame(
buildSendTextMsgFrame(
contact.publicKey,
outboundText,
attempt: attempt,
timestampSeconds: timestampSeconds,
),
);
} catch (e) {
appLogger.error('Failed to send message: $e', tag: 'Connector');
}
}
void _updateMessage(Message message) {
@@ -784,6 +838,20 @@ class MeshCoreConnector extends ChangeNotifier {
notifyListeners();
}
}
// If this is a reaction message, update the target message's reaction status
final reactionInfo = ReactionHelper.parseReaction(message.text);
if (reactionInfo != null &&
(message.status == MessageStatus.delivered ||
message.status == MessageStatus.failed)) {
final contactKey2 = pubKeyToHex(message.senderKey);
_setReactionStatus(contactKey2, reactionInfo, message.status);
_messageStore.saveMessages(
contactKey2,
_conversations[contactKey2] ?? [],
);
notifyListeners();
}
}
void _recordPathResult(
@@ -793,35 +861,68 @@ class MeshCoreConnector extends ChangeNotifier {
int? tripTimeMs,
) {
if (_pathHistoryService == null) return;
final settings = _appSettingsService?.settings;
_pathHistoryService!.recordPathResult(
contactPubKeyHex,
selection,
success: success,
tripTimeMs: tripTimeMs,
successIncrement: settings?.routeWeightSuccessIncrement ?? 0.2,
failureDecrement: settings?.routeWeightFailureDecrement ?? 0.2,
maxWeight: settings?.maxRouteWeight ?? 5.0,
);
// Flood path attribution: when a flood delivery succeeds, credit the
// contact's current device path so the route the ACK traveled back
// through gets a weight boost in the path history.
if (selection.useFlood && success) {
final contact = _contacts.cast<Contact?>().firstWhere(
(c) => c?.publicKeyHex == contactPubKeyHex,
orElse: () => null,
);
if (contact != null &&
contact.pathLength >= 0 &&
contact.path.isNotEmpty) {
_pathHistoryService!.recordFloodPathAttribution(
contactPubKeyHex: contactPubKeyHex,
pathBytes: contact.path,
hopCount: contact.pathLength,
tripTimeMs: tripTimeMs,
successIncrement: settings?.routeWeightSuccessIncrement ?? 0.2,
maxWeight: settings?.maxRouteWeight ?? 5.0,
);
}
// Request a fresh contact from the device so the next flood
// attribution uses the most up-to-date path.
if (contact != null) {
unawaited(getContactByKey(contact.publicKey));
}
}
}
Contact _applyAutoSelection(Contact contact, PathSelection? selection) {
if (selection == null ||
selection.useFlood ||
selection.pathBytes.isEmpty) {
return contact;
PathSelection? _selectAutoPathForAttempt(
String contactPubKeyHex, {
required int attemptIndex,
required int maxRetries,
List<PathSelection> recentSelections = const [],
}) {
final hasKnownPaths =
_pathHistoryService?.getRecentPaths(contactPubKeyHex).isNotEmpty ?? false;
if (!hasKnownPaths) {
return null;
}
return Contact(
publicKey: contact.publicKey,
name: contact.name,
type: contact.type,
flags: contact.flags,
pathLength: selection.hopCount >= 0
? selection.hopCount
: contact.pathLength,
path: Uint8List.fromList(selection.pathBytes),
latitude: contact.latitude,
longitude: contact.longitude,
lastSeen: contact.lastSeen,
lastMessageAt: contact.lastMessageAt,
final selection = _pathHistoryService?.selectPathForAttempt(
contactPubKeyHex,
attemptIndex: attemptIndex,
maxRetries: maxRetries,
recentSelections: recentSelections,
);
if (selection != null) {
_pathHistoryService?.recordPathAttempt(contactPubKeyHex, selection);
}
return selection;
}
Future<void> startScan({
@@ -1730,47 +1831,43 @@ class MeshCoreConnector extends ChangeNotifier {
Future<void> sendMessage(Contact contact, String text) async {
if (!isConnected || text.isEmpty) return;
// Handle auto-rotation if enabled
PathSelection? autoSelection;
if (_appSettingsService?.settings.autoRouteRotationEnabled == true) {
autoSelection = _pathHistoryService?.getNextAutoPathSelection(
// Check if this is a reaction - apply locally with pending status and route through retry service
final reactionInfo = ReactionHelper.parseReaction(text);
if (reactionInfo != null) {
_conversations.putIfAbsent(contact.publicKeyHex, () => []);
final messages = _conversations[contact.publicKeyHex]!;
// Apply reaction locally with pending status
_processOutgoingContactReaction(messages, reactionInfo, contact);
_setReactionStatus(
contact.publicKeyHex,
reactionInfo,
MessageStatus.pending,
);
if (autoSelection != null) {
_pathHistoryService?.recordPathAttempt(
contact.publicKeyHex,
autoSelection,
);
if (!autoSelection.useFlood && autoSelection.pathBytes.isNotEmpty) {
await setContactPath(
contact,
Uint8List.fromList(autoSelection.pathBytes),
autoSelection.pathBytes.length,
);
}
_messageStore.saveMessages(contact.publicKeyHex, messages);
notifyListeners();
// Route through retry service (same as normal messages)
// Don't use auto-rotation for reactions — just send directly
if (_retryService != null) {
_retryService!.sendMessageWithRetry(contact: contact, text: text);
} else {
final outboundText = prepareContactOutboundText(contact, text);
await sendFrame(buildSendTextMsgFrame(contact.publicKey, outboundText));
}
return;
}
if (_retryService != null) {
final pathBytes = _resolveOutgoingPathBytes(contact, autoSelection);
final pathLength = _resolveOutgoingPathLength(contact, autoSelection);
final selectedContact = _applyAutoSelection(contact, autoSelection);
await _retryService!.sendMessageWithRetry(
contact: selectedContact,
text: text,
pathSelection: autoSelection,
pathBytes: pathBytes,
pathLength: pathLength,
);
await _retryService!.sendMessageWithRetry(contact: contact, text: text);
} else {
// Fallback to old behavior if retry service not initialized
final pathBytes = _resolveOutgoingPathBytes(contact, autoSelection);
final pathLength = _resolveOutgoingPathLength(contact, autoSelection);
final resolved = resolvePathSelection(contact);
final message = Message.outgoing(
contact.publicKey,
text,
pathLength: pathLength,
pathBytes: pathBytes,
pathLength: resolved.useFlood ? -1 : resolved.hopCount,
pathBytes: Uint8List.fromList(resolved.pathBytes),
);
_addMessage(contact.publicKeyHex, message);
notifyListeners();
@@ -1808,6 +1905,16 @@ class MeshCoreConnector extends ChangeNotifier {
if (_activeTransport == MeshCoreTransportType.usb) {
await Future<void>.delayed(const Duration(milliseconds: 100));
}
final idx = _contacts.indexWhere(
(c) => c.publicKeyHex == contact.publicKeyHex,
);
if (idx != -1) {
_contacts[idx] = _contacts[idx].copyWith(
pathLength: customPath.length,
path: customPath,
);
notifyListeners();
}
} finally {
completer.complete();
}
@@ -1924,6 +2031,9 @@ class MeshCoreConnector extends ChangeNotifier {
await _contactStore.saveContacts(_contacts);
appLogger.info('Saved contacts to storage', tag: 'Connector');
// Update any in-flight retries so they use the new path override
_retryService?.updatePendingContact(_contacts[index]);
// If setting a specific path (not flood, not auto), also sync with device
if (pathLen != null && pathLen >= 0 && pathBytes != null) {
appLogger.info('Sending path to device...', tag: 'Connector');
@@ -1942,27 +2052,27 @@ class MeshCoreConnector extends ChangeNotifier {
final autoRotationEnabled =
_appSettingsService?.settings.autoRouteRotationEnabled == true;
if (autoRotationEnabled && contact.pathOverride == null) {
autoSelection = _pathHistoryService?.getNextAutoPathSelection(
final maxRetries = _appSettingsService?.settings.maxMessageRetries ?? 5;
autoSelection = _selectAutoPathForAttempt(
contact.publicKeyHex,
attemptIndex: 0,
maxRetries: maxRetries,
);
if (autoSelection != null) {
_pathHistoryService?.recordPathAttempt(
contact.publicKeyHex,
autoSelection,
);
}
}
final pathBytes = _resolveOutgoingPathBytes(contact, autoSelection);
final pathLength = _resolveOutgoingPathLength(contact, autoSelection) ?? -1;
final resolved = resolvePathSelection(contact, selection: autoSelection);
if (pathLength < 0) {
if (resolved.useFlood) {
await clearContactPath(contact);
} else {
await setContactPath(contact, pathBytes, pathLength);
await setContactPath(
contact,
Uint8List.fromList(resolved.pathBytes),
resolved.hopCount,
);
}
return _selectionFromPath(pathLength, pathBytes);
return resolved;
}
void trackRepeaterAck({
@@ -2626,6 +2736,7 @@ class MeshCoreConnector extends ChangeNotifier {
case pushCodeStatusResponse:
break;
case pushCodeLogRxData:
_lastRadioRxTime = DateTime.now();
_handleRxData(frame);
_handleLogRxData(frame);
break;
@@ -2929,16 +3040,17 @@ class MeshCoreConnector extends ChangeNotifier {
/// Physics-based worst-case timeout (ceiling).
int _physicsMaxTimeout(int pathLength, int airtime) {
if (pathLength < 0) {
// Match firmware: SEND_TIMEOUT_BASE_MILLIS + (FLOOD_SEND_TIMEOUT_FACTOR * airtime)
return 500 + (16 * airtime);
} else {
return 500 + ((airtime * 6 + 250) * (pathLength + 1));
}
}
/// Physics-based minimum timeout (floor): raw traversal time.
int _physicsMinTimeout(int pathLength, int airtime) {
if (pathLength < 0) {
return airtime;
// Same as max for flood — firmware uses a single formula
return 500 + (16 * airtime);
} else {
return airtime * (pathLength + 1);
}
@@ -2955,7 +3067,7 @@ class MeshCoreConnector extends ChangeNotifier {
final physicsMin = _physicsMinTimeout(pathLength, airtime);
final physicsMax = _physicsMaxTimeout(pathLength, airtime);
// Try ML-based prediction, clamped between physics bounds
// Try ML-based prediction
final secSinceRx = DateTime.now().difference(_lastRxTime).inSeconds;
final mlTimeout = _timeoutPredictionService?.predictTimeout(
contactKey: contactKey,
@@ -2964,9 +3076,14 @@ class MeshCoreConnector extends ChangeNotifier {
secondsSinceLastRx: secSinceRx,
);
if (mlTimeout != null) {
if (pathLength < 0) {
// Flood: trust ML, only enforce firmware formula as floor
return mlTimeout.clamp(physicsMin, mlTimeout);
}
return mlTimeout.clamp(physicsMin, physicsMax);
}
// No ML data — use firmware formula
return physicsMax;
}
@@ -3255,6 +3372,9 @@ class MeshCoreConnector extends ChangeNotifier {
}
if (message != null) {
if (!message.isOutgoing) {
_lastContactMsgRxTime = DateTime.now();
}
// Ignore messages from self (device hearing its own broadcast)
// BUT allow repeated messages (pathLength indicates it went through repeater)
if (_selfPublicKey != null &&
@@ -3302,7 +3422,6 @@ class MeshCoreConnector extends ChangeNotifier {
_appSettingsService != null) {
final settings = _appSettingsService!.settings;
if (settings.notificationsEnabled && settings.notifyOnNewMessage) {
// Find the contact name
if (contact?.type == advTypeChat) {
_notificationService.showMessageNotification(
contactName: contact?.name ?? 'Unknown',
@@ -3313,7 +3432,9 @@ class MeshCoreConnector extends ChangeNotifier {
} else if (contact?.type == advTypeRoom) {
_notificationService.showMessageNotification(
contactName: contact?.name ?? 'Unknown Room',
message: message.text.substring(4),
message: message.text.length > 4
? message.text.substring(4)
: message.text,
contactId: message.senderKeyHex,
badgeCount: getTotalUnreadCount(),
);
@@ -3488,6 +3609,7 @@ class MeshCoreConnector extends ChangeNotifier {
_notificationService.showChannelMessageNotification(
channelName: label,
senderName: message.senderName,
message: message.text,
channelIndex: channelIndex,
badgeCount: getTotalUnreadCount(),
@@ -3495,14 +3617,20 @@ class MeshCoreConnector extends ChangeNotifier {
}
void _handleIncomingChannelMessage(Uint8List frame) {
final message = ChannelMessage.fromFrame(frame);
if (message != null && message.channelIndex != null) {
final parsed = ChannelMessage.fromFrame(frame);
if (parsed != null && parsed.channelIndex != null) {
if (_shouldDropSelfChannelMessage(
message.senderName,
message.pathBytes,
parsed.senderName,
parsed.pathBytes,
)) {
return;
}
final contentHash = _computeContentHash(
parsed.channelIndex!,
parsed.timestamp.millisecondsSinceEpoch ~/ 1000,
'${parsed.senderName}: ${parsed.text}',
);
final message = parsed.copyWith(packetHash: contentHash);
_updateContactLastMessageAtByName(
message.senderName,
message.timestamp,
@@ -3554,6 +3682,8 @@ class MeshCoreConnector extends ChangeNotifier {
return;
}
final pktHash = _computePacketHash(packet.payloadType, packet.payload);
final message = ChannelMessage(
senderKey: null,
senderName: parsed.senderName,
@@ -3561,9 +3691,10 @@ class MeshCoreConnector extends ChangeNotifier {
timestamp: DateTime.fromMillisecondsSinceEpoch(timestampRaw * 1000),
isOutgoing: false,
status: ChannelMessageStatus.sent,
pathLength: packet.isFlood ? packet.pathBytes.length : 0,
pathLength: packet.isFlood ? packet.hopCount : 0,
pathBytes: packet.pathBytes,
channelIndex: channel.index,
packetHash: pktHash,
);
_updateContactLastMessageAtByName(
@@ -3611,21 +3742,13 @@ class MeshCoreConnector extends ChangeNotifier {
final retryService = _retryService;
if (retryService != null &&
retryService.updateMessageFromSent(
ackHash,
timeoutMs,
allowQueueFallback: false,
)) {
retryService.updateMessageFromSent(ackHash, timeoutMs)) {
return;
}
if (_markNextPendingChannelMessageSent()) {
return;
}
if (retryService != null) {
retryService.updateMessageFromSent(ackHash, timeoutMs);
}
} else {
// Fallback to old behavior
for (var messages in _conversations.values) {
@@ -4016,55 +4139,98 @@ class MeshCoreConnector extends ChangeNotifier {
ReactionInfo reactionInfo,
String contactPubKeyHex,
) {
// Find target message by computing hash and comparing
final targetHash = reactionInfo.targetHash;
final contact = _contacts.cast<Contact?>().firstWhere(
(c) => c?.publicKeyHex == contactPubKeyHex,
orElse: () => null,
);
final isRoomServer = contact?.type == advTypeRoom;
ReactionHelper.applyReaction<Message>(
messages: messages,
reactionInfo: reactionInfo,
// Incoming reactions in 1:1: match against outgoing messages only
shouldSkip: (msg) => isRoomServer != true && !msg.isOutgoing,
getTimestampSecs: (msg) => msg.timestamp.millisecondsSinceEpoch ~/ 1000,
getSenderName: (msg) =>
_resolveContactSenderName(msg, contact, isRoomServer == true),
getMessageText: (msg) => msg.text,
getReactions: (msg) => msg.reactions,
updateMessage: (i, reactions) {
messages[i] = messages[i].copyWith(reactions: reactions);
},
);
}
void _processOutgoingContactReaction(
List<Message> messages,
ReactionInfo reactionInfo,
Contact contact,
) {
final isRoomServer = contact.type == advTypeRoom;
ReactionHelper.applyReaction<Message>(
messages: messages,
reactionInfo: reactionInfo,
// Outgoing reactions in 1:1: match against incoming messages
shouldSkip: (msg) => !isRoomServer && msg.isOutgoing,
getTimestampSecs: (msg) => msg.timestamp.millisecondsSinceEpoch ~/ 1000,
getSenderName: (msg) =>
_resolveContactSenderName(msg, contact, isRoomServer),
getMessageText: (msg) => msg.text,
getReactions: (msg) => msg.reactions,
updateMessage: (i, reactions) {
messages[i] = messages[i].copyWith(reactions: reactions);
},
);
}
void _setReactionStatus(
String pubKeyHex,
ReactionInfo reactionInfo,
MessageStatus status,
) {
final messages = _conversations[pubKeyHex];
if (messages == null) return;
final contact = _contacts.cast<Contact?>().firstWhere(
(c) => c?.publicKeyHex == pubKeyHex,
orElse: () => null,
);
final isRoomServer = contact?.type == advTypeRoom;
for (int i = messages.length - 1; i >= 0; i--) {
final msg = messages[i];
// For 1:1 chats: contact reacts to my outgoing messages only
// For room servers: any message can be reacted to (multi-user)
if (!isRoomServer && !msg.isOutgoing) continue;
final timestampSecs = msg.timestamp.millisecondsSinceEpoch ~/ 1000;
// For room servers, include sender name (resolve from fourByteRoomContactKey)
// For 1:1 chats, sender is implicit (null)
String? senderName;
if (isRoomServer && !msg.isOutgoing) {
final senderContact = _contacts.cast<Contact?>().firstWhere(
(c) =>
c != null &&
_matchesPrefix(c.publicKey, msg.fourByteRoomContactKey),
orElse: () => null,
);
senderName = senderContact?.name;
} else if (isRoomServer && msg.isOutgoing) {
senderName = selfName;
}
// For 1:1, senderName stays null
final msgHash = ReactionHelper.computeReactionHash(
timestampSecs,
senderName,
_resolveContactSenderName(msg, contact, isRoomServer == true),
msg.text,
);
if (msgHash == targetHash) {
final currentReactions = Map<String, int>.from(msg.reactions);
currentReactions[reactionInfo.emoji] =
(currentReactions[reactionInfo.emoji] ?? 0) + 1;
messages[i] = msg.copyWith(reactions: currentReactions);
if (msgHash == reactionInfo.targetHash) {
final statuses = Map<String, MessageStatus>.from(msg.reactionStatuses);
statuses[reactionInfo.emoji] = status;
messages[i] = msg.copyWith(reactionStatuses: statuses);
break;
}
}
}
String? _resolveContactSenderName(
Message msg,
Contact? contact,
bool isRoomServer,
) {
if (!isRoomServer) return null;
if (!msg.isOutgoing) {
final senderContact = _contacts.cast<Contact?>().firstWhere(
(c) =>
c != null &&
_matchesPrefix(c.publicKey, msg.fourByteRoomContactKey),
orElse: () => null,
);
return senderContact?.name;
}
return selfName;
}
_RawPacket? _parseRawPacket(Uint8List raw) {
if (raw.length < 3) return null;
var index = 0;
@@ -4077,10 +4243,11 @@ class MeshCoreConnector extends ChangeNotifier {
index += 4;
}
if (raw.length <= index) return null;
final pathLen = raw[index++];
if (raw.length < index + pathLen) return null;
final pathBytes = Uint8List.fromList(raw.sublist(index, index + pathLen));
index += pathLen;
final pathLenRaw = raw[index++];
final pathByteLen = _decodePathByteLen(pathLenRaw);
if (raw.length < index + pathByteLen) return null;
final pathBytes = Uint8List.fromList(raw.sublist(index, index + pathByteLen));
index += pathByteLen;
if (raw.length <= index) return null;
final payload = Uint8List.fromList(raw.sublist(index));
@@ -4089,6 +4256,7 @@ class MeshCoreConnector extends ChangeNotifier {
routeType: routeType,
payloadType: (header >> _phTypeShift) & _phTypeMask,
payloadVer: (header >> _phVerShift) & _phVerMask,
pathLenRaw: pathLenRaw,
pathBytes: pathBytes,
payload: payload,
);
@@ -4099,6 +4267,30 @@ class MeshCoreConnector extends ChangeNotifier {
return digest[0];
}
/// Firmware-compatible packet hash: SHA256(payloadType + payload) -> first 8 bytes as hex.
String _computePacketHash(int payloadType, Uint8List payload) {
final input = Uint8List(1 + payload.length);
input[0] = payloadType;
input.setRange(1, input.length, payload);
final digest = crypto.sha256.convert(input).bytes;
return digest.sublist(0, 8).map((b) => b.toRadixString(16).padLeft(2, '0')).join();
}
/// Content-based dedup hash for sync queue messages (no raw payload available).
/// Prefixed with 'c:' to avoid collisions with packet hashes.
String _computeContentHash(int channelIdx, int timestampSecs, String fullText) {
final textBytes = utf8.encode(fullText);
final input = Uint8List(5 + textBytes.length);
input[0] = channelIdx;
input[1] = timestampSecs & 0xFF;
input[2] = (timestampSecs >> 8) & 0xFF;
input[3] = (timestampSecs >> 16) & 0xFF;
input[4] = (timestampSecs >> 24) & 0xFF;
input.setRange(5, 5 + textBytes.length, textBytes);
final digest = crypto.sha256.convert(input).bytes;
return 'c:${digest.sublist(0, 8).map((b) => b.toRadixString(16).padLeft(2, '0')).join()}';
}
Uint8List? _decryptPayload(Uint8List psk, Uint8List encrypted) {
if (encrypted.length <= _cipherMacSize) return null;
final mac = encrypted.sublist(0, _cipherMacSize);
@@ -4146,63 +4338,6 @@ class MeshCoreConnector extends ChangeNotifier {
return _ParsedText(senderName: 'Unknown', text: text);
}
Uint8List _resolveOutgoingPathBytes(
Contact contact,
PathSelection? selection,
) {
// Priority 1: Check user's path override
if (contact.pathOverride != null) {
if (contact.pathOverride! < 0) {
return Uint8List(0); // Force flood
}
return contact.pathOverrideBytes ?? Uint8List(0);
}
// Priority 2: Check device flood mode or PathSelection flood
if (contact.pathLength < 0 || selection?.useFlood == true) {
return Uint8List(0);
}
// Priority 3: Check PathSelection (auto-rotation)
if (selection != null && selection.pathBytes.isNotEmpty) {
return Uint8List.fromList(selection.pathBytes);
}
// Priority 4: Use device's discovered path
return contact.path;
}
int? _resolveOutgoingPathLength(Contact contact, PathSelection? selection) {
// Priority 1: Check user's path override
if (contact.pathOverride != null) {
return contact.pathOverride;
}
// Priority 2: Check device flood mode or PathSelection flood
if (contact.pathLength < 0 || selection?.useFlood == true) {
return -1;
}
// Priority 3: Check PathSelection (auto-rotation)
if (selection != null && selection.pathBytes.isNotEmpty) {
return selection.hopCount;
}
// Priority 4: Use device's discovered path
return contact.pathLength;
}
PathSelection _selectionFromPath(int pathLength, Uint8List pathBytes) {
if (pathLength < 0) {
return const PathSelection(pathBytes: [], hopCount: -1, useFlood: true);
}
return PathSelection(
pathBytes: pathBytes,
hopCount: pathLength,
useFlood: false,
);
}
bool _addChannelMessage(int channelIndex, ChannelMessage message) {
_channelMessages.putIfAbsent(channelIndex, () => []);
final messages = _channelMessages[channelIndex]!;
@@ -4292,6 +4427,7 @@ class MeshCoreConnector extends ChangeNotifier {
pathLength: mergedPathLength,
pathBytes: mergedPathBytes,
pathVariants: mergedPathVariants,
packetHash: existing.packetHash ?? processedMessage.packetHash,
// Mark as sent when first repeat is heard
status: promotedFromPending
? ChannelMessageStatus.sent
@@ -4326,35 +4462,38 @@ class MeshCoreConnector extends ChangeNotifier {
List<ChannelMessage> messages,
ReactionInfo reactionInfo,
) {
// Find target message by computing hash and comparing
final targetHash = reactionInfo.targetHash;
for (int i = messages.length - 1; i >= 0; i--) {
final msg = messages[i];
final timestampSecs = msg.timestamp.millisecondsSinceEpoch ~/ 1000;
final msgHash = ReactionHelper.computeReactionHash(
timestampSecs,
msg.senderName,
msg.text,
);
if (msgHash == targetHash) {
final currentReactions = Map<String, int>.from(msg.reactions);
currentReactions[reactionInfo.emoji] =
(currentReactions[reactionInfo.emoji] ?? 0) + 1;
messages[i] = msg.copyWith(reactions: currentReactions);
ReactionHelper.applyReaction<ChannelMessage>(
messages: messages,
reactionInfo: reactionInfo,
shouldSkip: (_) => false,
getTimestampSecs: (msg) => msg.timestamp.millisecondsSinceEpoch ~/ 1000,
getSenderName: (msg) => msg.senderName,
getMessageText: (msg) => msg.text,
getReactions: (msg) => msg.reactions,
updateMessage: (i, reactions) {
messages[i] = messages[i].copyWith(reactions: reactions);
notifyListeners();
break;
}
}
},
);
}
int _findChannelRepeatIndex(
List<ChannelMessage> messages,
ChannelMessage incoming,
) {
// First pass: match by packet hash (exact dedup)
final incomingHash = incoming.packetHash;
if (incomingHash != null) {
for (int i = messages.length - 1; i >= 0; i--) {
final existingHash = messages[i].packetHash;
if (existingHash != null && existingHash == incomingHash) {
return i;
}
}
}
// Second pass: heuristic fallback (outgoing echo, old messages without hash)
for (int i = messages.length - 1; i >= 0; i--) {
final existing = messages[i];
if (_isChannelRepeat(existing, incoming)) {
if (_isChannelRepeat(messages[i], incoming)) {
return i;
}
}
@@ -4368,7 +4507,7 @@ class MeshCoreConnector extends ChangeNotifier {
(existing.timestamp.millisecondsSinceEpoch -
incoming.timestamp.millisecondsSinceEpoch)
.abs();
if (diffMs > 5000) return false;
if (diffMs > 30000) return false;
if (existing.senderName == incoming.senderName) return true;
@@ -4613,8 +4752,9 @@ class MeshCoreConnector extends ChangeNotifier {
packet.skipBytes(4); // Skip transport-specific bytes
}
//final payloadVer = (header >> 6) & 0x03;
final pathLen = packet.readByte();
final pathBytes = packet.readBytes(pathLen);
final pathLenRaw = packet.readByte();
final pathByteLen = _decodePathByteLen(pathLenRaw);
final pathBytes = packet.readBytes(pathByteLen);
final payload = packet.readBytes(packet.remaining);
final rawPacket = frame.sublist(3);
@@ -4652,8 +4792,9 @@ class MeshCoreConnector extends ChangeNotifier {
packet.skipBytes(4); // Skip transport-specific bytes
}
//final payloadVer = (header >> 6) & 0x03;
final pathLen = packet.readByte();
pathBytes = packet.readBytes(pathLen);
final pathLenRaw = packet.readByte();
final pathByteLen = _decodePathByteLen(pathLenRaw);
pathBytes = packet.readBytes(pathByteLen);
} catch (e) {
appLogger.warn('Malformed RX frame: $e', tag: 'Connector');
return;
@@ -4990,11 +5131,20 @@ const int _routeTransportDirect = 0x03;
const int _payloadTypeGroupText = 0x05;
const int _cipherMacSize = 2;
/// Decodes the firmware's encoded path_len byte into actual byte length.
/// Bits 0-5: hash count (0-63), Bits 6-7: hash size code (0=1byte, 1=2bytes, 2=3bytes).
int _decodePathByteLen(int pathLenRaw) {
final hashCount = pathLenRaw & 63;
final hashSize = ((pathLenRaw >> 6) & 0x03) + 1;
return hashCount * hashSize;
}
class _RawPacket {
final int header;
final int routeType;
final int payloadType;
final int payloadVer;
final int pathLenRaw;
final Uint8List pathBytes;
final Uint8List payload;
@@ -5003,12 +5153,15 @@ class _RawPacket {
required this.routeType,
required this.payloadType,
required this.payloadVer,
required this.pathLenRaw,
required this.pathBytes,
required this.payload,
});
bool get isFlood =>
routeType == _routeFlood || routeType == _routeTransportFlood;
int get hopCount => pathLenRaw & 63;
}
class _ParsedText {
+2 -2
View File
@@ -509,7 +509,7 @@ Uint8List buildSendTextMsgFrame(
final writer = BufferWriter();
writer.writeByte(cmdSendTxtMsg);
writer.writeByte(txtTypePlain);
writer.writeByte(attempt.clamp(0, 3));
writer.writeByte(attempt.clamp(0, 255));
writer.writeUInt32LE(timestamp);
writer.writeBytes(recipientPubKey.sublist(0, 6));
writer.writeString(text);
@@ -838,7 +838,7 @@ Uint8List buildSendCliCommandFrame(
final writer = BufferWriter();
writer.writeByte(cmdSendTxtMsg);
writer.writeByte(txtTypeCliData);
writer.writeByte(attempt.clamp(0, 3));
writer.writeByte(attempt.clamp(0, 255));
writer.writeUInt32LE(timestamp);
writer.writeBytes(repeaterPubKey.sublist(0, 6));
writer.writeString(command);