mirror of
https://github.com/zjs81/meshcore-open.git
synced 2026-06-24 03:14:30 +10:00
Merge remote-tracking branch 'origin/main' into ez_group_dropdown3
# Conflicts: # lib/main.dart
This commit is contained in:
@@ -44,6 +44,12 @@ class MessageRetryService extends ChangeNotifier {
|
||||
[]; // Rolling buffer of recent ACK hashes
|
||||
final Map<String, List<String>> _pendingMessageQueuePerContact =
|
||||
{}; // contactPubKeyHex → FIFO queue of messageIds (DEPRECATED - will be removed)
|
||||
final Map<String, List<String>> _sendQueue =
|
||||
{}; // contactPubKeyHex → ordered list of messageIds awaiting send
|
||||
final Set<String> _activeMessages =
|
||||
{}; // messageIds currently in-flight (sent/retrying)
|
||||
final Set<String> _resolvedMessages =
|
||||
{}; // messageIds already resolved (prevents double _onMessageResolved)
|
||||
final Map<String, String> _expectedHashToMessageId =
|
||||
{}; // expectedAckHashHex → messageId (for matching RESP_CODE_SENT by hash)
|
||||
|
||||
@@ -52,12 +58,13 @@ class MessageRetryService extends ChangeNotifier {
|
||||
Function(Message)? _updateMessageCallback;
|
||||
Function(Contact)? _clearContactPathCallback;
|
||||
Function(Contact, Uint8List, int)? _setContactPathCallback;
|
||||
Function(int, int)? _calculateTimeoutCallback;
|
||||
Function(int, int, {String? contactKey})? _calculateTimeoutCallback;
|
||||
Uint8List? Function()? _getSelfPublicKeyCallback;
|
||||
String Function(Contact, String)? _prepareContactOutboundTextCallback;
|
||||
AppSettingsService? _appSettingsService;
|
||||
AppDebugLogService? _debugLogService;
|
||||
Function(String, PathSelection, bool, int?)? _recordPathResultCallback;
|
||||
Function(String, int, int, int)? _onDeliveryObservedCallback;
|
||||
|
||||
MessageRetryService();
|
||||
|
||||
@@ -67,12 +74,20 @@ class MessageRetryService extends ChangeNotifier {
|
||||
required Function(Message) updateMessageCallback,
|
||||
Function(Contact)? clearContactPathCallback,
|
||||
Function(Contact, Uint8List, int)? setContactPathCallback,
|
||||
Function(int pathLength, int messageBytes)? calculateTimeoutCallback,
|
||||
Function(int pathLength, int messageBytes, {String? contactKey})?
|
||||
calculateTimeoutCallback,
|
||||
Uint8List? Function()? getSelfPublicKeyCallback,
|
||||
String Function(Contact, String)? prepareContactOutboundTextCallback,
|
||||
AppSettingsService? appSettingsService,
|
||||
AppDebugLogService? debugLogService,
|
||||
Function(String, PathSelection, bool, int?)? recordPathResultCallback,
|
||||
Function(
|
||||
String contactKey,
|
||||
int pathLength,
|
||||
int messageBytes,
|
||||
int tripTimeMs,
|
||||
)?
|
||||
onDeliveryObservedCallback,
|
||||
}) {
|
||||
_sendMessageCallback = sendMessageCallback;
|
||||
_addMessageCallback = addMessageCallback;
|
||||
@@ -85,6 +100,7 @@ class MessageRetryService extends ChangeNotifier {
|
||||
_appSettingsService = appSettingsService;
|
||||
_debugLogService = debugLogService;
|
||||
_recordPathResultCallback = recordPathResultCallback;
|
||||
_onDeliveryObservedCallback = onDeliveryObservedCallback;
|
||||
}
|
||||
|
||||
/// Compute expected ACK hash using same algorithm as firmware:
|
||||
@@ -156,7 +172,49 @@ class MessageRetryService extends ChangeNotifier {
|
||||
_addMessageCallback!(contact.publicKeyHex, message);
|
||||
}
|
||||
|
||||
await _attemptSend(messageId);
|
||||
// Queue per contact — only one message in-flight at a time to avoid
|
||||
// overflowing the firmware's 8-entry expected_ack_table.
|
||||
final contactKey = contact.publicKeyHex;
|
||||
_sendQueue[contactKey] ??= [];
|
||||
_sendQueue[contactKey]!.add(messageId);
|
||||
|
||||
if (!_activeMessages.any(
|
||||
(id) => _pendingContacts[id]?.publicKeyHex == contactKey,
|
||||
)) {
|
||||
_sendNextForContact(contactKey);
|
||||
}
|
||||
}
|
||||
|
||||
void _sendNextForContact(String contactKey) {
|
||||
final queue = _sendQueue[contactKey];
|
||||
if (queue == null) return;
|
||||
|
||||
// Drain stale entries iteratively instead of recursing.
|
||||
while (queue.isNotEmpty) {
|
||||
final messageId = queue.removeAt(0);
|
||||
if (_pendingMessages.containsKey(messageId)) {
|
||||
_activeMessages.add(messageId);
|
||||
_attemptSend(messageId).catchError((e) {
|
||||
debugPrint('_attemptSend threw for $messageId: $e');
|
||||
final msg = _pendingMessages[messageId];
|
||||
if (msg != null) {
|
||||
final failed = msg.copyWith(status: MessageStatus.failed);
|
||||
_pendingMessages[messageId] = failed;
|
||||
_updateMessageCallback?.call(failed);
|
||||
}
|
||||
_onMessageResolved(messageId, contactKey);
|
||||
});
|
||||
return;
|
||||
}
|
||||
// Message was cancelled/cleaned up while queued — try next
|
||||
}
|
||||
}
|
||||
|
||||
void _onMessageResolved(String messageId, String contactKey) {
|
||||
if (_resolvedMessages.contains(messageId)) return;
|
||||
_resolvedMessages.add(messageId);
|
||||
_activeMessages.remove(messageId);
|
||||
_sendNextForContact(contactKey);
|
||||
}
|
||||
|
||||
Future<void> _attemptSend(String messageId) async {
|
||||
@@ -169,13 +227,11 @@ class MessageRetryService extends ChangeNotifier {
|
||||
// Use the path that was captured when the message was first sent
|
||||
if (_setContactPathCallback != null && _clearContactPathCallback != null) {
|
||||
if (message.pathLength != null && message.pathLength! < 0) {
|
||||
// Flood mode - clear the path
|
||||
debugPrint(
|
||||
'Setting flood mode for retry attempt ${message.retryCount}',
|
||||
);
|
||||
_clearContactPathCallback!(contact);
|
||||
await _clearContactPathCallback!(contact);
|
||||
} else if (message.pathLength != null && message.pathLength! >= 0) {
|
||||
// Specific path (including direct neighbor with pathLength=0)
|
||||
final pathStr = message.pathBytes.isEmpty
|
||||
? 'direct'
|
||||
: message.pathBytes
|
||||
@@ -192,6 +248,24 @@ class MessageRetryService extends ChangeNotifier {
|
||||
}
|
||||
}
|
||||
|
||||
// Re-validate after async gap — a timer or ACK could have resolved/retried
|
||||
// this message while we were awaiting the path callback.
|
||||
final currentMessage = _pendingMessages[messageId];
|
||||
if (currentMessage == null || _resolvedMessages.contains(messageId)) {
|
||||
debugPrint(
|
||||
'_attemptSend: message $messageId resolved during path sync, aborting',
|
||||
);
|
||||
return;
|
||||
}
|
||||
// If the message was retried by a timer during our await, the retryCount
|
||||
// will have advanced. Only proceed if it still matches the attempt we started.
|
||||
if (currentMessage.retryCount != message.retryCount) {
|
||||
debugPrint(
|
||||
'_attemptSend: message $messageId retryCount changed during path sync, aborting',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
final attempt = message.retryCount.clamp(0, 3);
|
||||
final timestampSeconds = message.timestamp.millisecondsSinceEpoch ~/ 1000;
|
||||
|
||||
@@ -231,6 +305,15 @@ class MessageRetryService extends ChangeNotifier {
|
||||
|
||||
if (_sendMessageCallback != null) {
|
||||
_sendMessageCallback!(contact, message.text, attempt, timestampSeconds);
|
||||
} else {
|
||||
// No send callback — message would be stuck forever. Fail it immediately.
|
||||
debugPrint(
|
||||
'_attemptSend: no sendMessageCallback, failing message $messageId',
|
||||
);
|
||||
final failedMessage = message.copyWith(status: MessageStatus.failed);
|
||||
_pendingMessages[messageId] = failedMessage;
|
||||
_updateMessageCallback?.call(failedMessage);
|
||||
_onMessageResolved(messageId, contact.publicKeyHex);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -281,6 +364,7 @@ class MessageRetryService extends ChangeNotifier {
|
||||
}
|
||||
|
||||
// FALLBACK: Old queue-based matching (for messages sent before hash computation was added)
|
||||
// Only match within a single contact's queue to avoid cross-contact mismatches.
|
||||
if (messageId == null && allowQueueFallback) {
|
||||
_debugLogService?.warn(
|
||||
'RESP_CODE_SENT: ACK hash $ackHashHex not found in hash table, falling back to queue',
|
||||
@@ -290,13 +374,16 @@ class MessageRetryService extends ChangeNotifier {
|
||||
'Hash-based match failed for $ackHashHex, falling back to queue-based matching',
|
||||
);
|
||||
|
||||
for (var entry in _pendingMessageQueuePerContact.entries) {
|
||||
// Search all contact queues so concurrent chats don't miss matches.
|
||||
final queuesToSearch = _pendingMessageQueuePerContact;
|
||||
|
||||
for (var entry in queuesToSearch.entries) {
|
||||
final contactKey = entry.key;
|
||||
final queue = entry.value;
|
||||
|
||||
if (queue.isNotEmpty) {
|
||||
// Drain stale entries until we find a valid one or exhaust the queue.
|
||||
while (queue.isNotEmpty) {
|
||||
final candidateMessageId = queue.removeAt(0);
|
||||
|
||||
if (_pendingMessages.containsKey(candidateMessageId)) {
|
||||
messageId = candidateMessageId;
|
||||
contact = _pendingContacts[candidateMessageId];
|
||||
@@ -304,21 +391,10 @@ class MessageRetryService extends ChangeNotifier {
|
||||
'Queue-based match (fallback): $ackHashHex → message $messageId for $contactKey',
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
debugPrint('Dequeued stale message $candidateMessageId - skipping');
|
||||
if (queue.isNotEmpty) {
|
||||
final nextMessageId = queue.removeAt(0);
|
||||
if (_pendingMessages.containsKey(nextMessageId)) {
|
||||
messageId = nextMessageId;
|
||||
contact = _pendingContacts[nextMessageId];
|
||||
debugPrint(
|
||||
'Queue-based match (fallback): $ackHashHex → message $messageId',
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
debugPrint('Dequeued stale message $candidateMessageId - skipping');
|
||||
}
|
||||
if (messageId != null) break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -357,25 +433,33 @@ class MessageRetryService extends ChangeNotifier {
|
||||
);
|
||||
}
|
||||
|
||||
// Use device-provided timeout, or calculate from radio settings if timeout is 0 or invalid
|
||||
// Calculate timeout: prefer ML prediction, then device-provided, then physics fallback
|
||||
int pathLengthValue;
|
||||
if (selection != null) {
|
||||
pathLengthValue = selection.useFlood ? -1 : selection.hopCount;
|
||||
if (pathLengthValue < 0) pathLengthValue = contact.pathLength;
|
||||
} else if (message.pathLength != null) {
|
||||
pathLengthValue = message.pathLength!;
|
||||
} else {
|
||||
pathLengthValue = contact.pathLength;
|
||||
}
|
||||
|
||||
int actualTimeout = timeoutMs;
|
||||
if (timeoutMs <= 0 && _calculateTimeoutCallback != null) {
|
||||
int pathLengthValue;
|
||||
if (selection != null) {
|
||||
pathLengthValue = selection.useFlood ? -1 : selection.hopCount;
|
||||
if (pathLengthValue < 0) pathLengthValue = contact.pathLength;
|
||||
} else if (message.pathLength != null) {
|
||||
pathLengthValue = message.pathLength!;
|
||||
} else {
|
||||
pathLengthValue = contact.pathLength;
|
||||
}
|
||||
actualTimeout = _calculateTimeoutCallback!(
|
||||
if (_calculateTimeoutCallback != null) {
|
||||
final calculated = _calculateTimeoutCallback!(
|
||||
pathLengthValue,
|
||||
message.text.length,
|
||||
contactKey: contact.publicKeyHex,
|
||||
);
|
||||
debugPrint(
|
||||
'Using calculated timeout: ${actualTimeout}ms for path length $pathLengthValue',
|
||||
);
|
||||
// calculateTimeout tries ML first, falls back to physics.
|
||||
// Use calculated value if device didn't provide one, or if ML
|
||||
// produced a tighter prediction than the device's estimate.
|
||||
if (timeoutMs <= 0 || calculated < timeoutMs) {
|
||||
actualTimeout = calculated;
|
||||
debugPrint(
|
||||
'Using calculated timeout: ${actualTimeout}ms for path length $pathLengthValue',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
final updatedMessage = message.copyWith(
|
||||
@@ -463,22 +547,7 @@ class MessageRetryService extends ChangeNotifier {
|
||||
} else {
|
||||
// Max retries reached - mark as failed
|
||||
final failedMessage = message.copyWith(status: MessageStatus.failed);
|
||||
|
||||
// Move ACK hashes to history before removing
|
||||
_moveAckHashesToHistory(messageId);
|
||||
|
||||
_pendingMessages.remove(messageId);
|
||||
_pendingContacts.remove(messageId);
|
||||
_pendingPathSelections.remove(messageId);
|
||||
_timeoutTimers[messageId]?.cancel();
|
||||
_timeoutTimers.remove(messageId);
|
||||
|
||||
// Clean up the queue entry for this contact
|
||||
_pendingMessageQueuePerContact[contact.publicKeyHex]?.remove(messageId);
|
||||
if (_pendingMessageQueuePerContact[contact.publicKeyHex]?.isEmpty ??
|
||||
false) {
|
||||
_pendingMessageQueuePerContact.remove(contact.publicKeyHex);
|
||||
}
|
||||
_pendingMessages[messageId] = failedMessage;
|
||||
|
||||
// Check if we should clear the path on max retry
|
||||
if (_appSettingsService?.settings.clearPathOnMaxRetry == true &&
|
||||
@@ -499,6 +568,30 @@ class MessageRetryService extends ChangeNotifier {
|
||||
}
|
||||
|
||||
notifyListeners();
|
||||
|
||||
// Message is done retrying — send next queued message for this contact
|
||||
_onMessageResolved(messageId, contact.publicKeyHex);
|
||||
|
||||
// Keep message in pending maps for 30s grace period so late ACKs
|
||||
// can still match and update the message to delivered.
|
||||
_timeoutTimers[messageId] = Timer(const Duration(seconds: 30), () {
|
||||
_moveAckHashesToHistory(messageId);
|
||||
// Clean up ALL hash mappings for this message
|
||||
_ackHashToMessageId.removeWhere(
|
||||
(_, mapping) => mapping.messageId == messageId,
|
||||
);
|
||||
_expectedHashToMessageId.removeWhere((_, msgId) => msgId == messageId);
|
||||
_pendingMessages.remove(messageId);
|
||||
_pendingContacts.remove(messageId);
|
||||
_pendingPathSelections.remove(messageId);
|
||||
_timeoutTimers.remove(messageId);
|
||||
_resolvedMessages.remove(messageId);
|
||||
final contactKey = contact.publicKeyHex;
|
||||
_pendingMessageQueuePerContact[contactKey]?.remove(messageId);
|
||||
if (_pendingMessageQueuePerContact[contactKey]?.isEmpty ?? false) {
|
||||
_pendingMessageQueuePerContact.remove(contactKey);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -594,7 +687,15 @@ class MessageRetryService extends ChangeNotifier {
|
||||
}
|
||||
|
||||
if (matchedMessageId != null) {
|
||||
final message = _pendingMessages[matchedMessageId]!;
|
||||
final message = _pendingMessages[matchedMessageId];
|
||||
if (message == null) {
|
||||
// Message was already cleaned up (e.g. grace period expired)
|
||||
_ackHashToMessageId.remove(ackHashHex);
|
||||
debugPrint(
|
||||
'ACK matched $matchedMessageId but message already cleaned up',
|
||||
);
|
||||
return;
|
||||
}
|
||||
final contact = _pendingContacts[matchedMessageId];
|
||||
final selection = _pendingPathSelections[matchedMessageId];
|
||||
|
||||
@@ -616,12 +717,21 @@ class MessageRetryService extends ChangeNotifier {
|
||||
tripTimeMs: tripTimeMs,
|
||||
);
|
||||
|
||||
// Clean up ALL hash mappings for this message (from all retry attempts)
|
||||
_ackHashToMessageId.removeWhere(
|
||||
(_, mapping) => mapping.messageId == matchedMessageId,
|
||||
);
|
||||
_expectedHashToMessageId.removeWhere(
|
||||
(_, msgId) => msgId == matchedMessageId,
|
||||
);
|
||||
|
||||
// Move ACK hashes to history before removing
|
||||
_moveAckHashesToHistory(matchedMessageId);
|
||||
|
||||
_pendingMessages.remove(matchedMessageId);
|
||||
_pendingContacts.remove(matchedMessageId);
|
||||
_pendingPathSelections.remove(matchedMessageId);
|
||||
_resolvedMessages.remove(matchedMessageId);
|
||||
|
||||
// Clean up the queue entry for this contact (remove any remaining references to this message)
|
||||
if (contact != null) {
|
||||
@@ -646,6 +756,17 @@ class MessageRetryService extends ChangeNotifier {
|
||||
true,
|
||||
tripTimeMs,
|
||||
);
|
||||
if (_onDeliveryObservedCallback != null &&
|
||||
tripTimeMs > 0 &&
|
||||
message.pathLength != null) {
|
||||
_onDeliveryObservedCallback!(
|
||||
contact.publicKeyHex,
|
||||
message.pathLength!,
|
||||
message.text.length,
|
||||
tripTimeMs,
|
||||
);
|
||||
}
|
||||
_onMessageResolved(matchedMessageId, contact.publicKeyHex);
|
||||
}
|
||||
|
||||
notifyListeners();
|
||||
@@ -783,6 +904,9 @@ class MessageRetryService extends ChangeNotifier {
|
||||
_ackHistory.clear();
|
||||
_ackHashToMessageId.clear();
|
||||
_pendingMessageQueuePerContact.clear();
|
||||
_sendQueue.clear();
|
||||
_activeMessages.clear();
|
||||
_resolvedMessages.clear();
|
||||
super.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,7 +232,9 @@ class NotificationService {
|
||||
|
||||
try {
|
||||
await _notifications.show(
|
||||
id: contactId?.hashCode ?? DateTime.now().millisecondsSinceEpoch,
|
||||
id: contactId != null
|
||||
? 'advert:$contactId'.hashCode
|
||||
: DateTime.now().millisecondsSinceEpoch,
|
||||
title: _l10n.notification_newTypeDiscovered(contactType),
|
||||
body: contactName,
|
||||
notificationDetails: notificationDetails,
|
||||
@@ -331,6 +333,61 @@ class NotificationService {
|
||||
await _notifications.cancel(id: id);
|
||||
}
|
||||
|
||||
/// Cancel the notification for a specific contact and update the app badge.
|
||||
Future<void> clearContactNotification(
|
||||
String contactId,
|
||||
int totalUnreadCount,
|
||||
) async {
|
||||
if (!await _ensureInitialized()) return;
|
||||
await _notifications.cancel(id: contactId.hashCode);
|
||||
await _updateBadge(totalUnreadCount);
|
||||
}
|
||||
|
||||
/// Cancel the notification for a specific channel and update the app badge.
|
||||
Future<void> clearChannelNotification(
|
||||
int channelIndex,
|
||||
int totalUnreadCount,
|
||||
) async {
|
||||
if (!await _ensureInitialized()) return;
|
||||
await _notifications.cancel(id: channelIndex.hashCode);
|
||||
await _updateBadge(totalUnreadCount);
|
||||
}
|
||||
|
||||
/// Cancel advert notifications for the given contact public key hexes.
|
||||
Future<void> clearAdvertNotifications(List<String> contactIds) async {
|
||||
if (!await _ensureInitialized()) return;
|
||||
for (final id in contactIds) {
|
||||
await _notifications.cancel(id: 'advert:$id'.hashCode);
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _updateBadge(int count) async {
|
||||
if (PlatformInfo.isIOS || PlatformInfo.isMacOS) {
|
||||
// On Apple platforms, set the badge number directly via a silent update.
|
||||
final darwinDetails = DarwinNotificationDetails(
|
||||
presentAlert: false,
|
||||
presentSound: false,
|
||||
presentBadge: true,
|
||||
badgeNumber: count,
|
||||
);
|
||||
final details = NotificationDetails(
|
||||
iOS: darwinDetails,
|
||||
macOS: darwinDetails,
|
||||
);
|
||||
// Use a fixed ID so each update replaces the previous one.
|
||||
await _notifications.show(
|
||||
id: 'badge_update'.hashCode,
|
||||
title: null,
|
||||
body: null,
|
||||
notificationDetails: details,
|
||||
);
|
||||
// Immediately cancel the silent notification so it doesn't appear in tray.
|
||||
await _notifications.cancel(id: 'badge_update'.hashCode);
|
||||
}
|
||||
// On Android, badge count is derived from active notifications,
|
||||
// so cancelling the specific notification above is sufficient.
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────
|
||||
// Public notification methods (rate limiting is enforced automatically)
|
||||
// ─────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import 'dart:convert';
|
||||
import '../models/delivery_observation.dart';
|
||||
import '../models/path_history.dart';
|
||||
import '../storage/prefs_manager.dart';
|
||||
|
||||
@@ -6,6 +7,7 @@ class StorageService {
|
||||
static const String _pathHistoryPrefix = 'path_history_';
|
||||
static const String _pendingMessagesKey = 'pending_messages';
|
||||
static const String _repeaterPasswordsKey = 'repeater_passwords';
|
||||
static const String _deliveryObservationsKey = 'delivery_observations';
|
||||
|
||||
Future<void> savePathHistory(
|
||||
String contactPubKeyHex,
|
||||
@@ -122,4 +124,33 @@ class StorageService {
|
||||
final prefs = PrefsManager.instance;
|
||||
await prefs.remove(_repeaterPasswordsKey);
|
||||
}
|
||||
|
||||
Future<void> saveDeliveryObservations(
|
||||
List<DeliveryObservation> observations,
|
||||
) async {
|
||||
final prefs = PrefsManager.instance;
|
||||
final jsonStr = jsonEncode(observations.map((o) => o.toJson()).toList());
|
||||
await prefs.setString(_deliveryObservationsKey, jsonStr);
|
||||
}
|
||||
|
||||
Future<List<DeliveryObservation>> loadDeliveryObservations() async {
|
||||
final prefs = PrefsManager.instance;
|
||||
final jsonStr = prefs.getString(_deliveryObservationsKey);
|
||||
|
||||
if (jsonStr == null) return [];
|
||||
|
||||
try {
|
||||
final list = jsonDecode(jsonStr) as List;
|
||||
return list
|
||||
.map((e) => DeliveryObservation.fromJson(e as Map<String, dynamic>))
|
||||
.toList();
|
||||
} catch (e) {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> clearDeliveryObservations() async {
|
||||
final prefs = PrefsManager.instance;
|
||||
await prefs.remove(_deliveryObservationsKey);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,229 @@
|
||||
import 'dart:async';
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:ml_algo/ml_algo.dart';
|
||||
import 'package:ml_dataframe/ml_dataframe.dart';
|
||||
import '../models/delivery_observation.dart';
|
||||
import 'storage_service.dart';
|
||||
|
||||
class _ContactStats {
|
||||
int count = 0;
|
||||
double _sum = 0;
|
||||
|
||||
void add(double ms) {
|
||||
count++;
|
||||
_sum += ms;
|
||||
}
|
||||
|
||||
double get mean => _sum / count;
|
||||
}
|
||||
|
||||
class TimeoutPredictionService extends ChangeNotifier {
|
||||
final StorageService? _storage;
|
||||
|
||||
static const int minObservations = 10;
|
||||
static const int maxObservations = 100;
|
||||
static const int _retrainInterval = 5;
|
||||
// 1.5x multiplier on raw prediction to account for variance in delivery
|
||||
// times — tight enough to improve on worst-case physics, loose enough
|
||||
// to avoid premature timeouts from model noise.
|
||||
static const double _safetyMargin = 1.5;
|
||||
static const int _minContactObservations = 10;
|
||||
|
||||
List<DeliveryObservation> _observations = [];
|
||||
LinearRegressor? _model;
|
||||
List<String> _activeFeatures = [];
|
||||
int _observationsSinceLastTrain = 0;
|
||||
final Map<String, _ContactStats> _contactStats = {};
|
||||
Timer? _persistTimer;
|
||||
|
||||
TimeoutPredictionService(StorageService storage) : _storage = storage;
|
||||
TimeoutPredictionService.noStorage() : _storage = null;
|
||||
|
||||
int get observationCount => _observations.length;
|
||||
bool get hasModel => _model != null;
|
||||
|
||||
Future<void> initialize() async {
|
||||
_observations = await _storage?.loadDeliveryObservations() ?? [];
|
||||
_rebuildContactStats();
|
||||
|
||||
if (_observations.length >= minObservations) {
|
||||
_trainModel();
|
||||
}
|
||||
|
||||
debugPrint(
|
||||
'TimeoutPrediction: initialized with ${_observations.length} observations, '
|
||||
'model=${_model != null ? "ready" : "waiting for data"}',
|
||||
);
|
||||
}
|
||||
|
||||
void recordObservation({
|
||||
required String contactKey,
|
||||
required int pathLength,
|
||||
required int messageBytes,
|
||||
required int tripTimeMs,
|
||||
int secondsSinceLastRx = 0,
|
||||
}) {
|
||||
final observation = DeliveryObservation(
|
||||
contactKey: contactKey,
|
||||
pathLength: pathLength,
|
||||
messageBytes: messageBytes,
|
||||
secondsSinceLastRx: secondsSinceLastRx,
|
||||
isFlood: pathLength < 0,
|
||||
deliveryMs: tripTimeMs,
|
||||
timestamp: DateTime.now(),
|
||||
);
|
||||
|
||||
_observations.add(observation);
|
||||
if (_observations.length > maxObservations) {
|
||||
_observations.removeAt(0);
|
||||
}
|
||||
|
||||
_contactStats.putIfAbsent(contactKey, () => _ContactStats());
|
||||
_contactStats[contactKey]!.add(tripTimeMs.toDouble());
|
||||
|
||||
_observationsSinceLastTrain++;
|
||||
if (_observationsSinceLastTrain >= _retrainInterval &&
|
||||
_observations.length >= minObservations) {
|
||||
_trainModel();
|
||||
}
|
||||
|
||||
_persistTimer?.cancel();
|
||||
_persistTimer = Timer(const Duration(seconds: 2), () {
|
||||
_storage?.saveDeliveryObservations(_observations);
|
||||
});
|
||||
debugPrint(
|
||||
'TimeoutPrediction: recorded ${tripTimeMs}ms for $pathLength hops '
|
||||
'(${_observations.length} total)',
|
||||
);
|
||||
}
|
||||
|
||||
int? predictTimeout({
|
||||
String? contactKey,
|
||||
required int pathLength,
|
||||
required int messageBytes,
|
||||
int secondsSinceLastRx = 0,
|
||||
}) {
|
||||
if (_model == null) return null;
|
||||
|
||||
try {
|
||||
if (_activeFeatures.isEmpty) return null;
|
||||
|
||||
final allFeatures = {
|
||||
'pathLength': pathLength.toDouble(),
|
||||
'messageBytes': messageBytes.toDouble(),
|
||||
'secSinceRx': secondsSinceLastRx.toDouble(),
|
||||
'isFlood': pathLength < 0 ? 1.0 : 0.0,
|
||||
};
|
||||
final row = _activeFeatures.map((f) => allFeatures[f]!).toList();
|
||||
|
||||
final features = DataFrame(
|
||||
[row],
|
||||
headerExists: false,
|
||||
header: _activeFeatures,
|
||||
);
|
||||
|
||||
final prediction = _model!.predict(features);
|
||||
final rawValue = prediction.rows.first.first;
|
||||
var predictedMs = (rawValue is double)
|
||||
? rawValue
|
||||
: (rawValue as num).toDouble();
|
||||
|
||||
debugPrint(
|
||||
'TimeoutPrediction: raw prediction=$predictedMs for '
|
||||
'pathLength=$pathLength, messageBytes=$messageBytes, '
|
||||
'features=$_activeFeatures',
|
||||
);
|
||||
|
||||
// Sanity check: if prediction is negative or zero, fall back
|
||||
if (predictedMs <= 0) return null;
|
||||
|
||||
// Blend with per-contact mean if enough data
|
||||
if (contactKey != null) {
|
||||
final stats = _contactStats[contactKey];
|
||||
if (stats != null && stats.count >= _minContactObservations) {
|
||||
predictedMs = 0.5 * predictedMs + 0.5 * stats.mean;
|
||||
}
|
||||
}
|
||||
|
||||
// Connector clamps this between physics min/max bounds
|
||||
final timeout = (predictedMs * _safetyMargin).ceil();
|
||||
debugPrint(
|
||||
'TimeoutPrediction: ML timeout ${timeout}ms '
|
||||
'(raw: ${predictedMs.round()}ms, contact: $contactKey)',
|
||||
);
|
||||
return timeout;
|
||||
} catch (e) {
|
||||
debugPrint('TimeoutPrediction: prediction failed: $e');
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
void _trainModel() {
|
||||
try {
|
||||
// Build feature columns, then exclude any with zero variance
|
||||
// (ml_algo's OLS produces all-zero coefficients for singular matrices)
|
||||
final allNames = ['pathLength', 'messageBytes', 'secSinceRx', 'isFlood'];
|
||||
final allExtractors = <double Function(DeliveryObservation)>[
|
||||
(o) => o.pathLength.toDouble(),
|
||||
(o) => o.messageBytes.toDouble(),
|
||||
(o) => o.secondsSinceLastRx.toDouble(),
|
||||
(o) => o.isFlood ? 1.0 : 0.0,
|
||||
];
|
||||
|
||||
_activeFeatures = [];
|
||||
for (var i = 0; i < allNames.length; i++) {
|
||||
final values = _observations.map(allExtractors[i]).toSet();
|
||||
if (values.length > 1) _activeFeatures.add(allNames[i]);
|
||||
}
|
||||
|
||||
if (_activeFeatures.isEmpty) {
|
||||
debugPrint(
|
||||
'TimeoutPrediction: no features with variance, skipping training',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
final header = [..._activeFeatures, 'deliveryMs'];
|
||||
final rows = _observations.map((o) {
|
||||
final row = <double>[];
|
||||
for (var i = 0; i < allNames.length; i++) {
|
||||
if (_activeFeatures.contains(allNames[i])) {
|
||||
row.add(allExtractors[i](o));
|
||||
}
|
||||
}
|
||||
row.add(o.deliveryMs.toDouble());
|
||||
return row;
|
||||
});
|
||||
|
||||
final data = DataFrame([header, ...rows], headerExists: true);
|
||||
|
||||
_model = LinearRegressor(data, 'deliveryMs');
|
||||
_observationsSinceLastTrain = 0;
|
||||
|
||||
// Log training summary with sample predictions
|
||||
final avgMs =
|
||||
_observations.map((o) => o.deliveryMs).reduce((a, b) => a + b) /
|
||||
_observations.length;
|
||||
debugPrint(
|
||||
'TimeoutPrediction: trained on ${_observations.length} observations '
|
||||
'(avg: ${avgMs.round()}ms, features: $_activeFeatures)',
|
||||
);
|
||||
} catch (e) {
|
||||
debugPrint('TimeoutPrediction: training failed: $e');
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void dispose() {
|
||||
_persistTimer?.cancel();
|
||||
super.dispose();
|
||||
}
|
||||
|
||||
void _rebuildContactStats() {
|
||||
_contactStats.clear();
|
||||
for (final obs in _observations) {
|
||||
_contactStats.putIfAbsent(obs.contactKey, () => _ContactStats());
|
||||
_contactStats[obs.contactKey]!.add(obs.deliveryMs.toDouble());
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user