feat: enhance MeshCoreConnector with improved timeout calculation and path resolution; add PathHopResolver for better contact resolution

This commit is contained in:
zjs81
2026-06-13 00:36:45 -07:00
parent 3707acb124
commit 5ea6b17b16
8 changed files with 529 additions and 234 deletions
+50 -8
View File
@@ -39,8 +39,12 @@ class RetryServiceConfig {
final void Function(Message) updateMessage;
final Function(Contact)? clearContactPath;
final Function(Contact, Uint8List, int)? setContactPath;
final int Function(int pathLength, int messageBytes, {String? contactKey})?
calculateTimeout;
final int Function(
int pathLength,
int messageBytes, {
String? contactKey,
int? deviceTimeoutMs,
})? calculateTimeout;
final Uint8List? Function()? getSelfPublicKey;
final String Function(Contact, String)? prepareContactOutboundText;
final AppSettingsService? appSettingsService;
@@ -74,6 +78,12 @@ class RetryServiceConfig {
class MessageRetryService extends ChangeNotifier {
static const int maxAckHistorySize = 100;
/// Global cap on concurrent in-flight messages across ALL contacts.
/// The firmware's expected_ack_table is a single 8-entry circular buffer
/// shared globally; cap at 6 to leave two slots of headroom.
static const int _maxGlobalInFlight = 6;
int _maxRetries = 5;
int get maxRetries => _maxRetries;
@@ -170,8 +180,9 @@ class MessageRetryService extends ChangeNotifier {
_config?.addMessage(contact.publicKeyHex, message);
// Queue per contact — only one message in-flight at a time to avoid
// overflowing the firmware's 8-entry expected_ack_table.
// Queue per contact — one message in-flight per contact at a time, and
// bounded globally by _maxGlobalInFlight across all contacts so we never
// overflow the firmware's 8-entry global expected_ack_table.
final contactKey = contact.publicKeyHex;
_sendQueue[contactKey] ??= [];
_sendQueue[contactKey]!.add(messageId);
@@ -184,6 +195,11 @@ class MessageRetryService extends ChangeNotifier {
}
void _sendNextForContact(String contactKey) {
// Enforce the global in-flight cap before starting a new send.
// The firmware's expected_ack_table is a single 8-entry circular buffer
// shared across all contacts; exceeding it silently evicts an older slot.
if (_activeMessages.length >= _maxGlobalInFlight) return;
final queue = _sendQueue[contactKey];
if (queue == null) return;
@@ -211,7 +227,16 @@ class MessageRetryService extends ChangeNotifier {
if (_resolvedMessages.contains(messageId)) return;
_resolvedMessages.add(messageId);
_activeMessages.remove(messageId);
// Pump this contact's queue first, then any other contacts that are waiting.
_sendNextForContact(contactKey);
for (final key in _sendQueue.keys) {
if (key == contactKey) continue;
if (_activeMessages.length >= _maxGlobalInFlight) break;
final queue = _sendQueue[key];
if (queue != null && queue.isNotEmpty) {
_sendNextForContact(key);
}
}
}
PathSelection? _selectPathForAttempt(Message message, Contact contact) {
@@ -352,6 +377,10 @@ class MessageRetryService extends ChangeNotifier {
}
bool updateMessageFromSent(int ackHash, int timeoutMs) {
// Firmware sets expected_ack = 0 for CLI/command sends (TXT_TYPE_CLI_DATA).
// No ACK will ever be issued for these, so arming a retry timer is wrong.
if (ackHash == 0) return false;
final config = _config;
if (config == null) return false;
@@ -404,13 +433,19 @@ class MessageRetryService extends ChangeNotifier {
// Calculate timeout: prefer ML prediction, then device-provided, then physics fallback
final pathLengthValue = message.pathLength ?? contact.pathLength;
final outboundTextForTimeout =
config.prepareContactOutboundText?.call(contact, message.text) ??
message.text;
final messageBytesForTimeout =
utf8.encode(outboundTextForTimeout).length;
int actualTimeout = timeoutMs;
if (config.calculateTimeout != null) {
actualTimeout = config.calculateTimeout!(
pathLengthValue,
message.text.length,
messageBytesForTimeout,
contactKey: contact.publicKeyHex,
deviceTimeoutMs: timeoutMs > 0 ? timeoutMs : null,
);
}
@@ -617,7 +652,6 @@ class MessageRetryService extends ChangeNotifier {
for (final expectedHash in expectedHashes) {
if (expectedHash == ackHash) {
matchedMessageId = messageId;
matchedAttemptIndex = expectedHashes.indexOf(expectedHash);
break;
}
}
@@ -669,10 +703,18 @@ class MessageRetryService extends ChangeNotifier {
if (config?.onDeliveryObserved != null &&
tripTimeMs > 0 &&
message.pathLength != null) {
config!.onDeliveryObserved!(
final outboundTextForObserved =
config!.prepareContactOutboundText?.call(
contact,
message.text,
) ??
message.text;
final messageBytesForObserved =
utf8.encode(outboundTextForObserved).length;
config.onDeliveryObserved!(
contact.publicKeyHex,
message.pathLength!,
message.text.length,
messageBytesForObserved,
tripTimeMs,
);
}
+3 -1
View File
@@ -134,10 +134,12 @@ class PathHistoryService extends ChangeNotifier {
newWeight = (currentWeight + successIncrement).clamp(0.0, maxWeight);
} else {
newWeight = currentWeight - failureDecrement;
if (newWeight <= 0) {
if (newWeight <= 0 && failureCount >= 3) {
removePathRecord(contactPubKeyHex, selection.pathBytes);
return;
}
// Keep the record with a small floor weight until we have enough evidence
newWeight = newWeight.clamp(0.1, maxWeight);
}
_addPathRecord(
+20 -8
View File
@@ -63,12 +63,15 @@ class TimeoutPredictionService extends ChangeNotifier {
required int tripTimeMs,
int secondsSinceLastRx = 0,
}) {
final isFlood = pathLength < 0;
final observation = DeliveryObservation(
contactKey: contactKey,
pathLength: pathLength,
// Clamp to 0 for flood so the hop-count slope is learned from direct paths
// only; isFlood carries the flood signal as a separate feature.
pathLength: isFlood ? 0 : pathLength,
messageBytes: messageBytes,
secondsSinceLastRx: secondsSinceLastRx,
isFlood: pathLength < 0,
isFlood: isFlood,
deliveryMs: tripTimeMs,
timestamp: DateTime.now(),
);
@@ -76,11 +79,12 @@ class TimeoutPredictionService extends ChangeNotifier {
_observations.add(observation);
if (_observations.length > maxObservations) {
_observations.removeAt(0);
_rebuildContactStats();
} else {
_contactStats.putIfAbsent(contactKey, () => _ContactStats());
_contactStats[contactKey]!.add(tripTimeMs.toDouble());
}
_contactStats.putIfAbsent(contactKey, () => _ContactStats());
_contactStats[contactKey]!.add(tripTimeMs.toDouble());
_observationsSinceLastTrain++;
if (_observationsSinceLastTrain >= _retrainInterval &&
_observations.length >= minObservations) {
@@ -108,11 +112,14 @@ class TimeoutPredictionService extends ChangeNotifier {
try {
if (_activeFeatures.isEmpty) return null;
final flood = pathLength < 0;
final allFeatures = {
'pathLength': pathLength.toDouble(),
// Clamp to 0 for flood mirrors recordObservation so training and
// prediction see the same pathLength values; isFlood carries the signal.
'pathLength': flood ? 0.0 : pathLength.toDouble(),
'messageBytes': messageBytes.toDouble(),
'secSinceRx': secondsSinceLastRx.toDouble(),
'isFlood': pathLength < 0 ? 1.0 : 0.0,
'isFlood': flood ? 1.0 : 0.0,
};
final row = _activeFeatures.map((f) => allFeatures[f]!).toList();
@@ -164,7 +171,9 @@ class TimeoutPredictionService extends ChangeNotifier {
// (ml_algo's OLS produces all-zero coefficients for singular matrices)
final allNames = ['pathLength', 'messageBytes', 'secSinceRx', 'isFlood'];
final allExtractors = <double Function(DeliveryObservation)>[
(o) => o.pathLength.toDouble(),
// pathLength is already clamped to >=0 in recordObservation, but guard
// here as well for any observations loaded from older persisted data.
(o) => o.pathLength < 0 ? 0.0 : o.pathLength.toDouble(),
(o) => o.messageBytes.toDouble(),
(o) => o.secondsSinceLastRx.toDouble(),
(o) => o.isFlood ? 1.0 : 0.0,
@@ -215,6 +224,9 @@ class TimeoutPredictionService extends ChangeNotifier {
@override
void dispose() {
if (_persistTimer?.isActive == true) {
_storage?.saveDeliveryObservations(_observations);
}
_persistTimer?.cancel();
super.dispose();
}