fix: Database error handling

This commit is contained in:
Christian Pauly 2020-08-26 09:38:14 +02:00
parent 6fbee4ee05
commit 9142dcbeec
2 changed files with 75 additions and 51 deletions

View file

@ -502,7 +502,7 @@ class Client extends MatrixApi {
StreamController.broadcast(); StreamController.broadcast();
/// Synchronization erros are coming here. /// Synchronization erros are coming here.
final StreamController<SyncError> onSyncError = StreamController.broadcast(); final StreamController<SdkError> onSyncError = StreamController.broadcast();
/// Synchronization erros are coming here. /// Synchronization erros are coming here.
final StreamController<ToDeviceEventDecryptionError> onOlmError = final StreamController<ToDeviceEventDecryptionError> onOlmError =
@ -725,7 +725,7 @@ class Client extends MatrixApi {
return; return;
} }
Logs.error('Error during processing events: ' + e.toString(), s); Logs.error('Error during processing events: ' + e.toString(), s);
onSyncError.add(SyncError( onSyncError.add(SdkError(
exception: e is Exception ? e : Exception(e), stackTrace: s)); exception: e is Exception ? e : Exception(e), stackTrace: s));
await Future.delayed(Duration(seconds: syncErrorTimeoutSec), _sync); await Future.delayed(Duration(seconds: syncErrorTimeoutSec), _sync);
} }
@ -1478,8 +1478,8 @@ class Client extends MatrixApi {
} }
} }
class SyncError { class SdkError {
Exception exception; Exception exception;
StackTrace stackTrace; StackTrace stackTrace;
SyncError({this.exception, this.stackTrace}); SdkError({this.exception, this.stackTrace});
} }

View file

@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'package:moor/moor.dart'; import 'package:moor/moor.dart';
@ -6,6 +7,7 @@ import 'package:olm/olm.dart' as olm;
import '../../famedlysdk.dart' as sdk; import '../../famedlysdk.dart' as sdk;
import '../../matrix_api.dart' as api; import '../../matrix_api.dart' as api;
import '../../matrix_api.dart'; import '../../matrix_api.dart';
import '../client.dart';
import '../room.dart'; import '../room.dart';
import '../utils/logs.dart'; import '../utils/logs.dart';
@ -57,61 +59,83 @@ class Database extends _$Database {
int get maxFileSize => 1 * 1024 * 1024; int get maxFileSize => 1 * 1024 * 1024;
/// Update errors are coming here.
final StreamController<SdkError> onError = StreamController.broadcast();
@override @override
MigrationStrategy get migration => MigrationStrategy( MigrationStrategy get migration => MigrationStrategy(
onCreate: (Migrator m) { onCreate: (Migrator m) async {
return m.createAll(); try {
await m.createAll();
} catch (e, s) {
Logs.error(e, s);
onError.add(SdkError(exception: e, stackTrace: s));
rethrow;
}
}, },
onUpgrade: (Migrator m, int from, int to) async { onUpgrade: (Migrator m, int from, int to) async {
// this appears to be only called once, so multiple consecutive upgrades have to be handled appropriately in here try {
if (from == 1) { // this appears to be only called once, so multiple consecutive upgrades have to be handled appropriately in here
await m.createIndexIfNotExists(userDeviceKeysIndex); if (from == 1) {
await m.createIndexIfNotExists(userDeviceKeysKeyIndex); await m.createIndexIfNotExists(userDeviceKeysIndex);
await m.createIndexIfNotExists(olmSessionsIndex); await m.createIndexIfNotExists(userDeviceKeysKeyIndex);
await m.createIndexIfNotExists(outboundGroupSessionsIndex); await m.createIndexIfNotExists(olmSessionsIndex);
await m.createIndexIfNotExists(inboundGroupSessionsIndex); await m.createIndexIfNotExists(outboundGroupSessionsIndex);
await m.createIndexIfNotExists(roomsIndex); await m.createIndexIfNotExists(inboundGroupSessionsIndex);
await m.createIndexIfNotExists(eventsIndex); await m.createIndexIfNotExists(roomsIndex);
await m.createIndexIfNotExists(roomStatesIndex); await m.createIndexIfNotExists(eventsIndex);
await m.createIndexIfNotExists(accountDataIndex); await m.createIndexIfNotExists(roomStatesIndex);
await m.createIndexIfNotExists(roomAccountDataIndex); await m.createIndexIfNotExists(accountDataIndex);
await m.createIndexIfNotExists(presencesIndex); await m.createIndexIfNotExists(roomAccountDataIndex);
from++; await m.createIndexIfNotExists(presencesIndex);
} from++;
if (from == 2) { }
await m.deleteTable('outbound_group_sessions'); if (from == 2) {
await m.createTable(outboundGroupSessions); await m.deleteTable('outbound_group_sessions');
from++; await m.createTable(outboundGroupSessions);
} from++;
if (from == 3) { }
await m.createTableIfNotExists(userCrossSigningKeys); if (from == 3) {
await m.createTableIfNotExists(ssssCache); await m.createTableIfNotExists(userCrossSigningKeys);
// mark all keys as outdated so that the cross signing keys will be fetched await m.createTableIfNotExists(ssssCache);
await m.issueCustomQuery( // mark all keys as outdated so that the cross signing keys will be fetched
'UPDATE user_device_keys SET outdated = true'); await m.issueCustomQuery(
from++; 'UPDATE user_device_keys SET outdated = true');
} from++;
if (from == 4) { }
await m.addColumnIfNotExists(olmSessions, olmSessions.lastReceived); if (from == 4) {
from++; await m.addColumnIfNotExists(
} olmSessions, olmSessions.lastReceived);
if (from == 5) { from++;
await m.addColumnIfNotExists( }
inboundGroupSessions, inboundGroupSessions.uploaded); if (from == 5) {
await m.addColumnIfNotExists( await m.addColumnIfNotExists(
inboundGroupSessions, inboundGroupSessions.senderKey); inboundGroupSessions, inboundGroupSessions.uploaded);
await m.addColumnIfNotExists( await m.addColumnIfNotExists(
inboundGroupSessions, inboundGroupSessions.senderClaimedKeys); inboundGroupSessions, inboundGroupSessions.senderKey);
from++; await m.addColumnIfNotExists(
inboundGroupSessions, inboundGroupSessions.senderClaimedKeys);
from++;
}
} catch (e, s) {
Logs.error(e, s);
onError.add(SdkError(exception: e, stackTrace: s));
rethrow;
} }
}, },
beforeOpen: (_) async { beforeOpen: (_) async {
if (executor.dialect == SqlDialect.sqlite) { try {
final ret = await customSelect('PRAGMA journal_mode=WAL').get(); if (executor.dialect == SqlDialect.sqlite) {
if (ret.isNotEmpty) { final ret = await customSelect('PRAGMA journal_mode=WAL').get();
Logs.info('[Moor] Switched database to mode ' + if (ret.isNotEmpty) {
ret.first.data['journal_mode'].toString()); Logs.info('[Moor] Switched database to mode ' +
ret.first.data['journal_mode'].toString());
}
} }
} catch (e, s) {
Logs.error(e, s);
onError.add(SdkError(exception: e, stackTrace: s));
rethrow;
} }
}, },
); );