decrypt events in sync loop, making it async
This commit is contained in:
parent
3ee5c2effa
commit
c5e4e2c751
|
@ -986,16 +986,17 @@ class Client {
|
|||
final syncResp = await _syncRequest;
|
||||
if (hash != _syncRequest.hashCode) return;
|
||||
_timeoutFactor = 1;
|
||||
final futures = handleSync(syncResp);
|
||||
if (database != null) {
|
||||
await database.transaction(() async {
|
||||
await handleSync(syncResp);
|
||||
if (prevBatch != syncResp['next_batch']) {
|
||||
await database.storePrevBatch(syncResp['next_batch'], id);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
await handleSync(syncResp);
|
||||
}
|
||||
if (_disposed) return;
|
||||
await database?.transaction(() async {
|
||||
for (final f in futures) {
|
||||
await f();
|
||||
}
|
||||
if (prevBatch != syncResp['next_batch']) {
|
||||
await database.storePrevBatch(syncResp['next_batch'], id);
|
||||
}
|
||||
});
|
||||
if (prevBatch == null) {
|
||||
onFirstSync.add(true);
|
||||
prevBatch = syncResp['next_batch'];
|
||||
|
@ -1015,34 +1016,33 @@ class Client {
|
|||
}
|
||||
|
||||
/// Use this method only for testing utilities!
|
||||
List<Future<dynamic> Function()> handleSync(dynamic sync) {
|
||||
final dbActions = <Future<dynamic> Function()>[];
|
||||
Future<void> handleSync(dynamic sync) async {
|
||||
if (sync['to_device'] is Map<String, dynamic> &&
|
||||
sync['to_device']['events'] is List<dynamic>) {
|
||||
_handleToDeviceEvents(sync['to_device']['events']);
|
||||
}
|
||||
if (sync['rooms'] is Map<String, dynamic>) {
|
||||
if (sync['rooms']['join'] is Map<String, dynamic>) {
|
||||
_handleRooms(sync['rooms']['join'], Membership.join, dbActions);
|
||||
await _handleRooms(sync['rooms']['join'], Membership.join);
|
||||
}
|
||||
if (sync['rooms']['invite'] is Map<String, dynamic>) {
|
||||
_handleRooms(sync['rooms']['invite'], Membership.invite, dbActions);
|
||||
await _handleRooms(sync['rooms']['invite'], Membership.invite);
|
||||
}
|
||||
if (sync['rooms']['leave'] is Map<String, dynamic>) {
|
||||
_handleRooms(sync['rooms']['leave'], Membership.leave, dbActions);
|
||||
await _handleRooms(sync['rooms']['leave'], Membership.leave);
|
||||
}
|
||||
}
|
||||
if (sync['presence'] is Map<String, dynamic> &&
|
||||
sync['presence']['events'] is List<dynamic>) {
|
||||
_handleGlobalEvents(sync['presence']['events'], 'presence', dbActions);
|
||||
await _handleGlobalEvents(sync['presence']['events'], 'presence');
|
||||
}
|
||||
if (sync['account_data'] is Map<String, dynamic> &&
|
||||
sync['account_data']['events'] is List<dynamic>) {
|
||||
_handleGlobalEvents(
|
||||
sync['account_data']['events'], 'account_data', dbActions);
|
||||
await _handleGlobalEvents(
|
||||
sync['account_data']['events'], 'account_data');
|
||||
}
|
||||
if (sync['device_lists'] is Map<String, dynamic>) {
|
||||
_handleDeviceListsEvents(sync['device_lists'], dbActions);
|
||||
await _handleDeviceListsEvents(sync['device_lists']);
|
||||
}
|
||||
if (sync['device_one_time_keys_count'] is Map<String, dynamic>) {
|
||||
_handleDeviceOneTimeKeysCount(sync['device_one_time_keys_count']);
|
||||
|
@ -1054,7 +1054,6 @@ class Client {
|
|||
);
|
||||
}
|
||||
onSync.add(sync);
|
||||
return dbActions;
|
||||
}
|
||||
|
||||
void _handleDeviceOneTimeKeysCount(
|
||||
|
@ -1071,15 +1070,13 @@ class Client {
|
|||
}
|
||||
}
|
||||
|
||||
void _handleDeviceListsEvents(Map<String, dynamic> deviceLists,
|
||||
List<Future<dynamic> Function()> dbActions) {
|
||||
Future<void> _handleDeviceListsEvents(Map<String, dynamic> deviceLists) async {
|
||||
if (deviceLists['changed'] is List) {
|
||||
for (final userId in deviceLists['changed']) {
|
||||
if (_userDeviceKeys.containsKey(userId)) {
|
||||
_userDeviceKeys[userId].outdated = true;
|
||||
if (database != null) {
|
||||
dbActions
|
||||
.add(() => database.storeUserDeviceKeysInfo(id, userId, true));
|
||||
await database.storeUserDeviceKeysInfo(id, userId, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1175,9 +1172,10 @@ class Client {
|
|||
}
|
||||
}
|
||||
|
||||
void _handleRooms(Map<String, dynamic> rooms, Membership membership,
|
||||
List<Future<dynamic> Function()> dbActions) {
|
||||
rooms.forEach((String id, dynamic room) {
|
||||
Future<void> _handleRooms(Map<String, dynamic> rooms, Membership membership) async {
|
||||
for (final entry in rooms.entries) {
|
||||
final id = entry.key;
|
||||
final room = entry.value;
|
||||
// calculate the notification counts, the limitedTimeline and prevbatch
|
||||
num highlight_count = 0;
|
||||
num notification_count = 0;
|
||||
|
@ -1224,8 +1222,7 @@ class Client {
|
|||
roomObj.resetSortOrder();
|
||||
}
|
||||
if (database != null) {
|
||||
dbActions.add(
|
||||
() => database.storeRoomUpdate(this.id, update, getRoomById(id)));
|
||||
await database.storeRoomUpdate(this.id, update, getRoomById(id));
|
||||
}
|
||||
onRoomUpdate.add(update);
|
||||
|
||||
|
@ -1235,45 +1232,44 @@ class Client {
|
|||
if (room['state'] is Map<String, dynamic> &&
|
||||
room['state']['events'] is List<dynamic> &&
|
||||
room['state']['events'].isNotEmpty) {
|
||||
_handleRoomEvents(id, room['state']['events'], 'state', dbActions);
|
||||
await _handleRoomEvents(id, room['state']['events'], 'state');
|
||||
handledEvents = true;
|
||||
}
|
||||
|
||||
if (room['invite_state'] is Map<String, dynamic> &&
|
||||
room['invite_state']['events'] is List<dynamic>) {
|
||||
_handleRoomEvents(
|
||||
id, room['invite_state']['events'], 'invite_state', dbActions);
|
||||
await _handleRoomEvents(
|
||||
id, room['invite_state']['events'], 'invite_state');
|
||||
}
|
||||
|
||||
if (room['timeline'] is Map<String, dynamic> &&
|
||||
room['timeline']['events'] is List<dynamic> &&
|
||||
room['timeline']['events'].isNotEmpty) {
|
||||
_handleRoomEvents(
|
||||
id, room['timeline']['events'], 'timeline', dbActions);
|
||||
await _handleRoomEvents(
|
||||
id, room['timeline']['events'], 'timeline');
|
||||
handledEvents = true;
|
||||
}
|
||||
|
||||
if (room['ephemeral'] is Map<String, dynamic> &&
|
||||
room['ephemeral']['events'] is List<dynamic>) {
|
||||
_handleEphemerals(id, room['ephemeral']['events'], dbActions);
|
||||
await _handleEphemerals(id, room['ephemeral']['events']);
|
||||
}
|
||||
|
||||
if (room['account_data'] is Map<String, dynamic> &&
|
||||
room['account_data']['events'] is List<dynamic>) {
|
||||
_handleRoomEvents(
|
||||
id, room['account_data']['events'], 'account_data', dbActions);
|
||||
await _handleRoomEvents(
|
||||
id, room['account_data']['events'], 'account_data');
|
||||
}
|
||||
|
||||
if (handledEvents && database != null && roomObj != null) {
|
||||
dbActions.add(() => roomObj.updateSortOrder());
|
||||
await roomObj.updateSortOrder();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void _handleEphemerals(String id, List<dynamic> events,
|
||||
List<Future<dynamic> Function()> dbActions) {
|
||||
Future<void> _handleEphemerals(String id, List<dynamic> events) async {
|
||||
for (num i = 0; i < events.length; i++) {
|
||||
_handleEvent(events[i], id, 'ephemeral', dbActions);
|
||||
await _handleEvent(events[i], id, 'ephemeral');
|
||||
|
||||
// Receipt events are deltas between two states. We will create a
|
||||
// fake room account data event for this and store the difference
|
||||
|
@ -1310,20 +1306,18 @@ class Client {
|
|||
}
|
||||
}
|
||||
events[i]['content'] = receiptStateContent;
|
||||
_handleEvent(events[i], id, 'account_data', dbActions);
|
||||
await _handleEvent(events[i], id, 'account_data');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void _handleRoomEvents(String chat_id, List<dynamic> events, String type,
|
||||
List<Future<dynamic> Function()> dbActions) {
|
||||
Future<void> _handleRoomEvents(String chat_id, List<dynamic> events, String type) async {
|
||||
for (num i = 0; i < events.length; i++) {
|
||||
_handleEvent(events[i], chat_id, type, dbActions);
|
||||
await _handleEvent(events[i], chat_id, type);
|
||||
}
|
||||
}
|
||||
|
||||
void _handleGlobalEvents(List<dynamic> events, String type,
|
||||
List<Future<dynamic> Function()> dbActions) {
|
||||
Future<void> _handleGlobalEvents(List<dynamic> events, String type) async {
|
||||
for (var i = 0; i < events.length; i++) {
|
||||
if (events[i]['type'] is String &&
|
||||
events[i]['content'] is Map<String, dynamic>) {
|
||||
|
@ -1333,15 +1327,14 @@ class Client {
|
|||
content: events[i],
|
||||
);
|
||||
if (database != null) {
|
||||
dbActions.add(() => database.storeUserEventUpdate(id, update));
|
||||
await database.storeUserEventUpdate(id, update);
|
||||
}
|
||||
onUserEvent.add(update);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void _handleEvent(Map<String, dynamic> event, String roomID, String type,
|
||||
List<Future<dynamic> Function()> dbActions) {
|
||||
Future<void> _handleEvent(Map<String, dynamic> event, String roomID, String type) async {
|
||||
if (event['type'] is String && event['content'] is Map<String, dynamic>) {
|
||||
// The client must ignore any new m.room.encryption event to prevent
|
||||
// man-in-the-middle attacks!
|
||||
|
@ -1365,7 +1358,7 @@ class Client {
|
|||
update = update.decrypt(room);
|
||||
}
|
||||
if (type != 'ephemeral' && database != null) {
|
||||
dbActions.add(() => database.storeEventUpdate(id, update));
|
||||
await database.storeEventUpdate(id, update);
|
||||
}
|
||||
_updateRoomsByEventUpdate(update);
|
||||
onEvent.add(update);
|
||||
|
|
|
@ -116,19 +116,6 @@ class Timeline {
|
|||
try {
|
||||
if (eventUpdate.roomID != room.id) return;
|
||||
|
||||
// try to decrypt the event first, if needed
|
||||
if (eventUpdate.eventType == 'm.room.encrypted' && room.client.database != null) {
|
||||
try {
|
||||
await room.loadInboundGroupSessionKey(eventUpdate.content['content']['session_id']);
|
||||
eventUpdate = eventUpdate.decrypt(room);
|
||||
if (eventUpdate.eventType != 'm.room.encrypted') {
|
||||
await room.client.database.storeEventUpdate(room.client.id, eventUpdate);
|
||||
}
|
||||
} catch (err) {
|
||||
print('[WARNING] (_handleEventUpdate) Failed to decrypt event: ${err.toString()}');
|
||||
}
|
||||
}
|
||||
|
||||
if (eventUpdate.type == 'timeline' || eventUpdate.type == 'history') {
|
||||
// Redaction events are handled as modification for existing events.
|
||||
if (eventUpdate.eventType == 'm.room.redaction') {
|
||||
|
|
|
@ -207,7 +207,7 @@ void main() {
|
|||
.verified,
|
||||
false);
|
||||
|
||||
matrix.handleSync({
|
||||
await matrix.handleSync({
|
||||
'device_lists': {
|
||||
'changed': [
|
||||
'@alice:example.com',
|
||||
|
@ -221,7 +221,7 @@ void main() {
|
|||
expect(matrix.userDeviceKeys.length, 2);
|
||||
expect(matrix.userDeviceKeys['@alice:example.com'].outdated, true);
|
||||
|
||||
matrix.handleSync({
|
||||
await matrix.handleSync({
|
||||
'rooms': {
|
||||
'join': {
|
||||
'!726s6s6q:example.com': {
|
||||
|
@ -527,7 +527,7 @@ void main() {
|
|||
test('Track oneTimeKeys', () async {
|
||||
if (matrix.encryptionEnabled) {
|
||||
var last = matrix.lastTimeKeysUploaded ?? DateTime.now();
|
||||
matrix.handleSync({
|
||||
await matrix.handleSync({
|
||||
'device_one_time_keys_count': {'signed_curve25519': 49}
|
||||
});
|
||||
await Future.delayed(Duration(milliseconds: 50));
|
||||
|
@ -542,7 +542,7 @@ void main() {
|
|||
expect(matrix.rooms[1].outboundGroupSession == null, true);
|
||||
await matrix.rooms[1].createOutboundGroupSession();
|
||||
expect(matrix.rooms[1].outboundGroupSession != null, true);
|
||||
matrix.handleSync({
|
||||
await matrix.handleSync({
|
||||
'device_lists': {
|
||||
'changed': [
|
||||
'@alice:example.com',
|
||||
|
@ -562,7 +562,7 @@ void main() {
|
|||
expect(matrix.rooms[1].outboundGroupSession == null, true);
|
||||
await matrix.rooms[1].createOutboundGroupSession();
|
||||
expect(matrix.rooms[1].outboundGroupSession != null, true);
|
||||
matrix.handleSync({
|
||||
await matrix.handleSync({
|
||||
'rooms': {
|
||||
'join': {
|
||||
'!726s6s6q:example.com': {
|
||||
|
|
Loading…
Reference in a new issue