mirror of
https://github.com/zjs81/meshcore-open.git
synced 2026-06-22 02:14:28 +10:00
Add advanced path management, debug logging, and fix channel sync
New features: - In-app debug log viewer with copy/clear functionality - Advanced path management UI with history and custom path builder - Battery indicator widget with voltage/percentage toggle - Contact/channel filtering and sorting improvements - Repeater command ACK tracking with path history integration Fixes: - Switch channel sync from parallel to sequential to prevent timeouts - Preserve path overrides when contacts refresh from device - Fix ACK hash computation for SMAZ-encoded messages - Proper cleanup of pending operations on disconnect
This commit is contained in:
@@ -0,0 +1,92 @@
|
||||
import 'package:flutter/foundation.dart';
|
||||
|
||||
enum AppDebugLogLevel {
|
||||
info,
|
||||
warning,
|
||||
error,
|
||||
}
|
||||
|
||||
class AppDebugLogEntry {
|
||||
final DateTime timestamp;
|
||||
final AppDebugLogLevel level;
|
||||
final String tag;
|
||||
final String message;
|
||||
|
||||
AppDebugLogEntry({
|
||||
required this.timestamp,
|
||||
required this.level,
|
||||
required this.tag,
|
||||
required this.message,
|
||||
});
|
||||
|
||||
String get levelLabel {
|
||||
switch (level) {
|
||||
case AppDebugLogLevel.info:
|
||||
return 'INFO';
|
||||
case AppDebugLogLevel.warning:
|
||||
return 'WARN';
|
||||
case AppDebugLogLevel.error:
|
||||
return 'ERROR';
|
||||
}
|
||||
}
|
||||
|
||||
String get formattedTime {
|
||||
return '${timestamp.hour.toString().padLeft(2, '0')}:'
|
||||
'${timestamp.minute.toString().padLeft(2, '0')}:'
|
||||
'${timestamp.second.toString().padLeft(2, '0')}.'
|
||||
'${timestamp.millisecond.toString().padLeft(3, '0')}';
|
||||
}
|
||||
}
|
||||
|
||||
class AppDebugLogService extends ChangeNotifier {
|
||||
static const int maxEntries = 1000;
|
||||
final List<AppDebugLogEntry> _entries = [];
|
||||
bool _enabled = false;
|
||||
|
||||
List<AppDebugLogEntry> get entries => List.unmodifiable(_entries);
|
||||
bool get enabled => _enabled;
|
||||
|
||||
void setEnabled(bool value) {
|
||||
_enabled = value;
|
||||
notifyListeners();
|
||||
}
|
||||
|
||||
void log(String message, {String tag = 'App', AppDebugLogLevel level = AppDebugLogLevel.info}) {
|
||||
if (!_enabled) return;
|
||||
|
||||
_entries.add(
|
||||
AppDebugLogEntry(
|
||||
timestamp: DateTime.now(),
|
||||
level: level,
|
||||
tag: tag,
|
||||
message: message,
|
||||
),
|
||||
);
|
||||
|
||||
if (_entries.length > maxEntries) {
|
||||
_entries.removeRange(0, _entries.length - maxEntries);
|
||||
}
|
||||
|
||||
notifyListeners();
|
||||
|
||||
// Also print to console for development
|
||||
debugPrint('[$tag] $message');
|
||||
}
|
||||
|
||||
void info(String message, {String tag = 'App'}) {
|
||||
log(message, tag: tag, level: AppDebugLogLevel.info);
|
||||
}
|
||||
|
||||
void warn(String message, {String tag = 'App'}) {
|
||||
log(message, tag: tag, level: AppDebugLogLevel.warning);
|
||||
}
|
||||
|
||||
void error(String message, {String tag = 'App'}) {
|
||||
log(message, tag: tag, level: AppDebugLogLevel.error);
|
||||
}
|
||||
|
||||
void clear() {
|
||||
_entries.clear();
|
||||
notifyListeners();
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ import 'dart:convert';
|
||||
import 'package:flutter/foundation.dart';
|
||||
import '../models/app_settings.dart';
|
||||
import '../storage/prefs_manager.dart';
|
||||
import '../utils/app_logger.dart';
|
||||
|
||||
class AppSettingsService extends ChangeNotifier {
|
||||
static const String _settingsKey = 'app_settings';
|
||||
@@ -112,6 +113,12 @@ class AppSettingsService extends ChangeNotifier {
|
||||
await updateSettings(_settings.copyWith(themeMode: value));
|
||||
}
|
||||
|
||||
Future<void> setAppDebugLogEnabled(bool value) async {
|
||||
await updateSettings(_settings.copyWith(appDebugLogEnabled: value));
|
||||
// Update the global logger
|
||||
appLogger.setEnabled(value);
|
||||
}
|
||||
|
||||
Future<void> setBatteryChemistryForDevice(String deviceId, String chemistry) async {
|
||||
final updated = Map<String, String>.from(_settings.batteryChemistryByDeviceId);
|
||||
updated[deviceId] = chemistry;
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:uuid/uuid.dart';
|
||||
import 'package:crypto/crypto.dart';
|
||||
import '../models/contact.dart';
|
||||
import '../models/message.dart';
|
||||
import '../models/path_selection.dart';
|
||||
import 'storage_service.dart';
|
||||
import 'app_settings_service.dart';
|
||||
import 'app_debug_log_service.dart';
|
||||
|
||||
class _AckHistoryEntry {
|
||||
final String messageId;
|
||||
@@ -41,7 +44,8 @@ class MessageRetryService extends ChangeNotifier {
|
||||
final Map<String, _AckHashMapping> _ackHashToMessageId = {}; // ackHashHex → messageId + timestamp for O(1) lookup
|
||||
final Map<String, List<Uint8List>> _expectedAckHashes = {}; // Track all expected ACKs for retries (for history)
|
||||
final List<_AckHistoryEntry> _ackHistory = []; // Rolling buffer of recent ACK hashes
|
||||
final Map<String, List<String>> _pendingMessageQueuePerContact = {}; // contactPubKeyHex → FIFO queue of messageIds
|
||||
final Map<String, List<String>> _pendingMessageQueuePerContact = {}; // contactPubKeyHex → FIFO queue of messageIds (DEPRECATED - will be removed)
|
||||
final Map<String, String> _expectedHashToMessageId = {}; // expectedAckHashHex → messageId (for matching RESP_CODE_SENT by hash)
|
||||
|
||||
Function(Contact, String, int, int)? _sendMessageCallback;
|
||||
Function(String, Message)? _addMessageCallback;
|
||||
@@ -49,7 +53,10 @@ class MessageRetryService extends ChangeNotifier {
|
||||
Function(Contact)? _clearContactPathCallback;
|
||||
Function(Contact, Uint8List, int)? _setContactPathCallback;
|
||||
Function(int, int)? _calculateTimeoutCallback;
|
||||
Uint8List? Function()? _getSelfPublicKeyCallback;
|
||||
String Function(Contact, String)? _prepareContactOutboundTextCallback;
|
||||
AppSettingsService? _appSettingsService;
|
||||
AppDebugLogService? _debugLogService;
|
||||
Function(String, PathSelection, bool, int?)? _recordPathResultCallback;
|
||||
|
||||
MessageRetryService(this._storage);
|
||||
@@ -61,7 +68,10 @@ class MessageRetryService extends ChangeNotifier {
|
||||
Function(Contact)? clearContactPathCallback,
|
||||
Function(Contact, Uint8List, int)? setContactPathCallback,
|
||||
Function(int pathLength, int messageBytes)? calculateTimeoutCallback,
|
||||
Uint8List? Function()? getSelfPublicKeyCallback,
|
||||
String Function(Contact, String)? prepareContactOutboundTextCallback,
|
||||
AppSettingsService? appSettingsService,
|
||||
AppDebugLogService? debugLogService,
|
||||
Function(String, PathSelection, bool, int?)? recordPathResultCallback,
|
||||
}) {
|
||||
_sendMessageCallback = sendMessageCallback;
|
||||
@@ -70,10 +80,46 @@ class MessageRetryService extends ChangeNotifier {
|
||||
_clearContactPathCallback = clearContactPathCallback;
|
||||
_setContactPathCallback = setContactPathCallback;
|
||||
_calculateTimeoutCallback = calculateTimeoutCallback;
|
||||
_getSelfPublicKeyCallback = getSelfPublicKeyCallback;
|
||||
_prepareContactOutboundTextCallback = prepareContactOutboundTextCallback;
|
||||
_appSettingsService = appSettingsService;
|
||||
_debugLogService = debugLogService;
|
||||
_recordPathResultCallback = recordPathResultCallback;
|
||||
}
|
||||
|
||||
/// Compute expected ACK hash using same algorithm as firmware:
|
||||
/// SHA256([timestamp(4)][attempt(1)][text][sender_pubkey(32)]) -> first 4 bytes
|
||||
static Uint8List computeExpectedAckHash(
|
||||
int timestampSeconds,
|
||||
int attempt,
|
||||
String text,
|
||||
Uint8List senderPubKey,
|
||||
) {
|
||||
final textBytes = utf8.encode(text);
|
||||
final buffer = Uint8List(4 + 1 + textBytes.length + senderPubKey.length);
|
||||
int offset = 0;
|
||||
|
||||
// timestamp (4 bytes, little-endian)
|
||||
buffer[offset++] = timestampSeconds & 0xFF;
|
||||
buffer[offset++] = (timestampSeconds >> 8) & 0xFF;
|
||||
buffer[offset++] = (timestampSeconds >> 16) & 0xFF;
|
||||
buffer[offset++] = (timestampSeconds >> 24) & 0xFF;
|
||||
|
||||
// attempt (1 byte)
|
||||
buffer[offset++] = attempt & 0x03;
|
||||
|
||||
// text
|
||||
buffer.setRange(offset, offset + textBytes.length, textBytes);
|
||||
offset += textBytes.length;
|
||||
|
||||
// sender public key (32 bytes)
|
||||
buffer.setRange(offset, offset + senderPubKey.length, senderPubKey);
|
||||
|
||||
// Compute SHA256 and return first 4 bytes
|
||||
final hash = sha256.convert(buffer);
|
||||
return Uint8List.fromList(hash.bytes.sublist(0, 4));
|
||||
}
|
||||
|
||||
Future<void> sendMessageWithRetry({
|
||||
required Contact contact,
|
||||
required String text,
|
||||
@@ -136,14 +182,35 @@ class MessageRetryService extends ChangeNotifier {
|
||||
}
|
||||
|
||||
final attempt = message.retryCount.clamp(0, 3);
|
||||
final timestampSeconds = message.timestamp.millisecondsSinceEpoch ~/ 1000;
|
||||
|
||||
// Enqueue this message to track send order for ACK hash mapping (FIFO)
|
||||
// Compute expected ACK hash that device will return in RESP_CODE_SENT
|
||||
// IMPORTANT: Use the transformed text (with SMAZ encoding if enabled) to match device's hash
|
||||
final selfPubKey = _getSelfPublicKeyCallback?.call();
|
||||
if (selfPubKey != null) {
|
||||
final outboundText = _prepareContactOutboundTextCallback?.call(contact, message.text) ?? message.text;
|
||||
final expectedHash = MessageRetryService.computeExpectedAckHash(
|
||||
timestampSeconds,
|
||||
attempt,
|
||||
outboundText,
|
||||
selfPubKey,
|
||||
);
|
||||
final expectedHashHex = expectedHash.map((b) => b.toRadixString(16).padLeft(2, '0')).join();
|
||||
_expectedHashToMessageId[expectedHashHex] = messageId;
|
||||
|
||||
final shortText = message.text.length > 20 ? '${message.text.substring(0, 20)}...' : message.text;
|
||||
_debugLogService?.info(
|
||||
'Sent "$shortText" to ${contact.name} → expect ACK hash $expectedHashHex (attempt $attempt)',
|
||||
tag: 'AckHash',
|
||||
);
|
||||
debugPrint('Computed expected ACK hash $expectedHashHex for message $messageId');
|
||||
}
|
||||
|
||||
// DEPRECATED: Old queue-based matching (kept for fallback)
|
||||
_pendingMessageQueuePerContact[contact.publicKeyHex] ??= [];
|
||||
_pendingMessageQueuePerContact[contact.publicKeyHex]!.add(messageId);
|
||||
debugPrint('Enqueued message $messageId for ${contact.name} (queue size: ${_pendingMessageQueuePerContact[contact.publicKeyHex]!.length})');
|
||||
|
||||
if (_sendMessageCallback != null) {
|
||||
final timestampSeconds = message.timestamp.millisecondsSinceEpoch ~/ 1000;
|
||||
_sendMessageCallback!(
|
||||
contact,
|
||||
message.text,
|
||||
@@ -156,35 +223,68 @@ class MessageRetryService extends ChangeNotifier {
|
||||
void updateMessageFromSent(Uint8List ackHash, int timeoutMs) {
|
||||
final ackHashHex = ackHash.map((b) => b.toRadixString(16).padLeft(2, '0')).join();
|
||||
|
||||
// Dequeue the next message from the FIFO queue to match with this RESP_CODE_SENT
|
||||
// We iterate through contacts to find which one has a pending message in their queue
|
||||
String? messageId;
|
||||
// NEW: Try hash-based matching first (fixes LoRa message drops causing mismatches)
|
||||
String? messageId = _expectedHashToMessageId.remove(ackHashHex);
|
||||
Contact? contact;
|
||||
|
||||
for (var entry in _pendingMessageQueuePerContact.entries) {
|
||||
final contactKey = entry.key;
|
||||
final queue = entry.value;
|
||||
if (messageId != null) {
|
||||
contact = _pendingContacts[messageId];
|
||||
final message = _pendingMessages[messageId];
|
||||
|
||||
if (queue.isNotEmpty) {
|
||||
// Dequeue the first (oldest) message from this contact's queue
|
||||
final candidateMessageId = queue.removeAt(0);
|
||||
if (contact != null && message != null) {
|
||||
final shortText = message.text.length > 20 ? '${message.text.substring(0, 20)}...' : message.text;
|
||||
_debugLogService?.info(
|
||||
'RESP_CODE_SENT received: ACK hash $ackHashHex ✓ matched "$shortText" to ${contact.name}',
|
||||
tag: 'AckHash',
|
||||
);
|
||||
debugPrint('Hash-based match: ACK hash $ackHashHex → message $messageId ✓');
|
||||
|
||||
// Verify this message is still pending
|
||||
if (_pendingMessages.containsKey(candidateMessageId)) {
|
||||
messageId = candidateMessageId;
|
||||
contact = _pendingContacts[candidateMessageId];
|
||||
debugPrint('Dequeued message $messageId for $contactKey (remaining in queue: ${queue.length})');
|
||||
break;
|
||||
} else {
|
||||
debugPrint('Dequeued stale message $candidateMessageId - skipping');
|
||||
// Continue to next message in queue
|
||||
if (queue.isNotEmpty) {
|
||||
final nextMessageId = queue.removeAt(0);
|
||||
if (_pendingMessages.containsKey(nextMessageId)) {
|
||||
messageId = nextMessageId;
|
||||
contact = _pendingContacts[nextMessageId];
|
||||
debugPrint('Dequeued next message $messageId for $contactKey (remaining: ${queue.length})');
|
||||
break;
|
||||
// Remove from old queue since we matched
|
||||
_pendingMessageQueuePerContact[contact.publicKeyHex]?.remove(messageId);
|
||||
if (_pendingMessageQueuePerContact[contact.publicKeyHex]?.isEmpty ?? false) {
|
||||
_pendingMessageQueuePerContact.remove(contact.publicKeyHex);
|
||||
}
|
||||
} else {
|
||||
_debugLogService?.warn(
|
||||
'RESP_CODE_SENT: ACK hash $ackHashHex matched but message no longer pending',
|
||||
tag: 'AckHash',
|
||||
);
|
||||
debugPrint('Hash matched $messageId but message no longer pending');
|
||||
messageId = null;
|
||||
contact = null;
|
||||
}
|
||||
}
|
||||
|
||||
// FALLBACK: Old queue-based matching (for messages sent before hash computation was added)
|
||||
if (messageId == null) {
|
||||
_debugLogService?.warn(
|
||||
'RESP_CODE_SENT: ACK hash $ackHashHex not found in hash table, falling back to queue',
|
||||
tag: 'AckHash',
|
||||
);
|
||||
debugPrint('Hash-based match failed for $ackHashHex, falling back to queue-based matching');
|
||||
|
||||
for (var entry in _pendingMessageQueuePerContact.entries) {
|
||||
final contactKey = entry.key;
|
||||
final queue = entry.value;
|
||||
|
||||
if (queue.isNotEmpty) {
|
||||
final candidateMessageId = queue.removeAt(0);
|
||||
|
||||
if (_pendingMessages.containsKey(candidateMessageId)) {
|
||||
messageId = candidateMessageId;
|
||||
contact = _pendingContacts[candidateMessageId];
|
||||
debugPrint('Queue-based match (fallback): $ackHashHex → message $messageId for $contactKey');
|
||||
break;
|
||||
} else {
|
||||
debugPrint('Dequeued stale message $candidateMessageId - skipping');
|
||||
if (queue.isNotEmpty) {
|
||||
final nextMessageId = queue.removeAt(0);
|
||||
if (_pendingMessages.containsKey(nextMessageId)) {
|
||||
messageId = nextMessageId;
|
||||
contact = _pendingContacts[nextMessageId];
|
||||
debugPrint('Queue-based match (fallback): $ackHashHex → message $messageId');
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -192,7 +292,7 @@ class MessageRetryService extends ChangeNotifier {
|
||||
}
|
||||
|
||||
if (messageId == null || contact == null) {
|
||||
debugPrint('No pending message found for ACK hash: $ackHashHex (all queues empty or stale)');
|
||||
debugPrint('No pending message found for ACK hash: $ackHashHex');
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -270,6 +370,11 @@ class MessageRetryService extends ChangeNotifier {
|
||||
return;
|
||||
}
|
||||
|
||||
final shortText = message.text.length > 20 ? '${message.text.substring(0, 20)}...' : message.text;
|
||||
_debugLogService?.warn(
|
||||
'Timeout: No ACK received for "$shortText" to ${contact.name} (attempt ${message.retryCount}) → retrying',
|
||||
tag: 'AckHash',
|
||||
);
|
||||
debugPrint('Timeout for message $messageId (retry ${message.retryCount}/${maxRetries - 1})');
|
||||
|
||||
if (message.retryCount < maxRetries - 1) {
|
||||
@@ -287,8 +392,14 @@ class MessageRetryService extends ChangeNotifier {
|
||||
_updateMessageCallback!(updatedMessage);
|
||||
}
|
||||
|
||||
_debugLogService?.info(
|
||||
'Scheduling retry for "$shortText" to ${contact.name} after ${backoffMs}ms backoff',
|
||||
tag: 'AckHash',
|
||||
);
|
||||
debugPrint('Scheduling retry after ${backoffMs}ms');
|
||||
Timer(Duration(milliseconds: backoffMs), () {
|
||||
|
||||
// Store the backoff timer so it can be canceled if new RESP_CODE_SENT arrives
|
||||
_timeoutTimers[messageId] = Timer(Duration(milliseconds: backoffMs), () {
|
||||
// Double-check message is still pending before retry
|
||||
if (_pendingMessages.containsKey(messageId)) {
|
||||
_attemptSend(messageId);
|
||||
@@ -388,6 +499,10 @@ class MessageRetryService extends ChangeNotifier {
|
||||
matchedMessageId = mapping.messageId;
|
||||
debugPrint('Matched ACK to message via direct lookup: $matchedMessageId');
|
||||
} else {
|
||||
_debugLogService?.warn(
|
||||
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex not found in direct mapping, trying fallback',
|
||||
tag: 'AckHash',
|
||||
);
|
||||
// Fallback: Check against ALL expected ACK hashes (from all retry attempts)
|
||||
debugPrint('ACK not in mapping, checking _expectedAckHashes (${_expectedAckHashes.length} messages)');
|
||||
for (var entry in _expectedAckHashes.entries) {
|
||||
@@ -411,6 +526,12 @@ class MessageRetryService extends ChangeNotifier {
|
||||
final contact = _pendingContacts[matchedMessageId];
|
||||
final selection = _pendingPathSelections[matchedMessageId];
|
||||
|
||||
final shortText = message.text.length > 20 ? '${message.text.substring(0, 20)}...' : message.text;
|
||||
_debugLogService?.info(
|
||||
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex ✓ "$shortText" delivered to ${contact?.name ?? "unknown"} in ${tripTimeMs}ms',
|
||||
tag: 'AckHash',
|
||||
);
|
||||
|
||||
// Cancel any pending timeout or retry
|
||||
_timeoutTimers[matchedMessageId]?.cancel();
|
||||
_timeoutTimers.remove(matchedMessageId);
|
||||
@@ -448,8 +569,16 @@ class MessageRetryService extends ChangeNotifier {
|
||||
} else {
|
||||
// Check ACK history for recently completed messages
|
||||
if (_checkAckHistory(ackHash)) {
|
||||
_debugLogService?.info(
|
||||
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex matched a recently completed message (duplicate ACK)',
|
||||
tag: 'AckHash',
|
||||
);
|
||||
debugPrint('ACK matched a recently completed message from history');
|
||||
} else {
|
||||
_debugLogService?.error(
|
||||
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex has no matching message!',
|
||||
tag: 'AckHash',
|
||||
);
|
||||
debugPrint('No matching message found for ACK: $ackHashHex');
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import 'dart:async';
|
||||
import '../models/contact.dart';
|
||||
import '../models/path_selection.dart';
|
||||
import '../connector/meshcore_connector.dart';
|
||||
import '../connector/meshcore_protocol.dart';
|
||||
|
||||
@@ -11,7 +12,6 @@ class RepeaterCommandService {
|
||||
final Map<String, String> _pendingByPrefix = {};
|
||||
int _prefixCounter = 0;
|
||||
|
||||
static const int timeoutSeconds = 10; // Flood mode timeout
|
||||
static const int maxRetries = 5;
|
||||
|
||||
RepeaterCommandService(this._connector);
|
||||
@@ -23,6 +23,7 @@ class RepeaterCommandService {
|
||||
String command, {
|
||||
Function(String)? onResponse,
|
||||
Function(int)? onAttempt,
|
||||
int retries = maxRetries,
|
||||
}) async {
|
||||
final repeaterKey = repeater.publicKeyHex;
|
||||
final hasPending = _pendingCommands.keys.any((id) => id.startsWith(repeaterKey));
|
||||
@@ -30,43 +31,83 @@ class RepeaterCommandService {
|
||||
throw Exception('Another command is still awaiting a response.');
|
||||
}
|
||||
|
||||
// Create completer for this command
|
||||
final attemptCount = retries < 1 ? 1 : retries;
|
||||
final selection = await _connector.preparePathForContactSend(repeater);
|
||||
|
||||
for (int attempt = 0; attempt < attemptCount; attempt++) {
|
||||
onAttempt?.call(attempt + 1);
|
||||
try {
|
||||
final response = await _sendCommandAttempt(
|
||||
repeater,
|
||||
command,
|
||||
selection,
|
||||
attempt,
|
||||
);
|
||||
onResponse?.call(response);
|
||||
return response;
|
||||
} catch (e) {
|
||||
if (attempt == attemptCount - 1) rethrow;
|
||||
}
|
||||
}
|
||||
|
||||
throw Exception('Command failed after $attemptCount attempts');
|
||||
}
|
||||
|
||||
Future<String> _sendCommandAttempt(
|
||||
Contact repeater,
|
||||
String command,
|
||||
PathSelection selection,
|
||||
int attempt,
|
||||
) async {
|
||||
final repeaterKey = repeater.publicKeyHex;
|
||||
final commandId = '${repeaterKey}_${DateTime.now().millisecondsSinceEpoch}';
|
||||
final completer = Completer<String>();
|
||||
_pendingCommands[commandId] = completer;
|
||||
|
||||
onAttempt?.call(0);
|
||||
|
||||
// Send frame once (no retries)
|
||||
try {
|
||||
final prefix = _nextPrefixToken();
|
||||
_commandPrefixes[commandId] = prefix;
|
||||
_pendingByPrefix[prefix] = commandId;
|
||||
final framedCommand = '$prefix$command';
|
||||
final frame = buildSendCliCommandFrame(repeater.publicKey, framedCommand, attempt: 0);
|
||||
final pathLengthValue = selection.useFlood ? -1 : selection.hopCount;
|
||||
final timeoutMs = _connector.calculateTimeout(
|
||||
pathLength: pathLengthValue,
|
||||
messageBytes: framedCommand.length,
|
||||
);
|
||||
final timeoutSeconds = (timeoutMs / 1000).ceil();
|
||||
final timestampSeconds = DateTime.now().millisecondsSinceEpoch ~/ 1000;
|
||||
_connector.trackRepeaterAck(
|
||||
contact: repeater,
|
||||
selection: selection,
|
||||
text: framedCommand,
|
||||
timestampSeconds: timestampSeconds,
|
||||
attempt: attempt,
|
||||
);
|
||||
final frame = buildSendCliCommandFrame(
|
||||
repeater.publicKey,
|
||||
framedCommand,
|
||||
attempt: attempt,
|
||||
timestampSeconds: timestampSeconds,
|
||||
);
|
||||
await _connector.sendFrame(frame);
|
||||
_commandTimeouts[commandId]?.cancel();
|
||||
_commandTimeouts[commandId] = Timer(
|
||||
Duration(milliseconds: timeoutMs),
|
||||
() {
|
||||
final completer = _pendingCommands[commandId];
|
||||
if (completer != null && !completer.isCompleted) {
|
||||
completer.completeError('Command timeout after $timeoutSeconds seconds');
|
||||
_cleanup(commandId);
|
||||
}
|
||||
},
|
||||
);
|
||||
} catch (e) {
|
||||
_cleanup(commandId);
|
||||
throw Exception('Failed to send command: $e');
|
||||
}
|
||||
|
||||
// Set timeout for this attempt
|
||||
_commandTimeouts[commandId]?.cancel();
|
||||
_commandTimeouts[commandId] = Timer(
|
||||
Duration(seconds: timeoutSeconds),
|
||||
() {
|
||||
final completer = _pendingCommands[commandId];
|
||||
if (completer != null && !completer.isCompleted) {
|
||||
completer.completeError('Command timeout after $timeoutSeconds seconds');
|
||||
_cleanup(commandId);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Wait for response or timeout
|
||||
try {
|
||||
final response = await completer.future;
|
||||
return response;
|
||||
return await completer.future;
|
||||
} finally {
|
||||
_cleanup(commandId);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user