Merge branch 'soru/async-sync-processing' into 'master'

decrypt events in sync loop, making it async

See merge request famedly/famedlysdk!310
This commit is contained in:
Sorunome 2020-05-19 08:14:46 +00:00
commit a613c84e27
3 changed files with 54 additions and 69 deletions

View file

@ -986,16 +986,17 @@ class Client {
final syncResp = await _syncRequest;
if (hash != _syncRequest.hashCode) return;
_timeoutFactor = 1;
final futures = handleSync(syncResp);
if (database != null) {
await database.transaction(() async {
await handleSync(syncResp);
if (prevBatch != syncResp['next_batch']) {
await database.storePrevBatch(syncResp['next_batch'], id);
}
});
} else {
await handleSync(syncResp);
}
if (_disposed) return;
await database?.transaction(() async {
for (final f in futures) {
await f();
}
if (prevBatch != syncResp['next_batch']) {
await database.storePrevBatch(syncResp['next_batch'], id);
}
});
if (prevBatch == null) {
onFirstSync.add(true);
prevBatch = syncResp['next_batch'];
@ -1015,34 +1016,33 @@ class Client {
}
/// Use this method only for testing utilities!
List<Future<dynamic> Function()> handleSync(dynamic sync) {
final dbActions = <Future<dynamic> Function()>[];
Future<void> handleSync(dynamic sync) async {
if (sync['to_device'] is Map<String, dynamic> &&
sync['to_device']['events'] is List<dynamic>) {
_handleToDeviceEvents(sync['to_device']['events']);
}
if (sync['rooms'] is Map<String, dynamic>) {
if (sync['rooms']['join'] is Map<String, dynamic>) {
_handleRooms(sync['rooms']['join'], Membership.join, dbActions);
await _handleRooms(sync['rooms']['join'], Membership.join);
}
if (sync['rooms']['invite'] is Map<String, dynamic>) {
_handleRooms(sync['rooms']['invite'], Membership.invite, dbActions);
await _handleRooms(sync['rooms']['invite'], Membership.invite);
}
if (sync['rooms']['leave'] is Map<String, dynamic>) {
_handleRooms(sync['rooms']['leave'], Membership.leave, dbActions);
await _handleRooms(sync['rooms']['leave'], Membership.leave);
}
}
if (sync['presence'] is Map<String, dynamic> &&
sync['presence']['events'] is List<dynamic>) {
_handleGlobalEvents(sync['presence']['events'], 'presence', dbActions);
await _handleGlobalEvents(sync['presence']['events'], 'presence');
}
if (sync['account_data'] is Map<String, dynamic> &&
sync['account_data']['events'] is List<dynamic>) {
_handleGlobalEvents(
sync['account_data']['events'], 'account_data', dbActions);
await _handleGlobalEvents(
sync['account_data']['events'], 'account_data');
}
if (sync['device_lists'] is Map<String, dynamic>) {
_handleDeviceListsEvents(sync['device_lists'], dbActions);
await _handleDeviceListsEvents(sync['device_lists']);
}
if (sync['device_one_time_keys_count'] is Map<String, dynamic>) {
_handleDeviceOneTimeKeysCount(sync['device_one_time_keys_count']);
@ -1054,7 +1054,6 @@ class Client {
);
}
onSync.add(sync);
return dbActions;
}
void _handleDeviceOneTimeKeysCount(
@ -1071,15 +1070,13 @@ class Client {
}
}
void _handleDeviceListsEvents(Map<String, dynamic> deviceLists,
List<Future<dynamic> Function()> dbActions) {
Future<void> _handleDeviceListsEvents(Map<String, dynamic> deviceLists) async {
if (deviceLists['changed'] is List) {
for (final userId in deviceLists['changed']) {
if (_userDeviceKeys.containsKey(userId)) {
_userDeviceKeys[userId].outdated = true;
if (database != null) {
dbActions
.add(() => database.storeUserDeviceKeysInfo(id, userId, true));
await database.storeUserDeviceKeysInfo(id, userId, true);
}
}
}
@ -1175,9 +1172,10 @@ class Client {
}
}
void _handleRooms(Map<String, dynamic> rooms, Membership membership,
List<Future<dynamic> Function()> dbActions) {
rooms.forEach((String id, dynamic room) {
Future<void> _handleRooms(Map<String, dynamic> rooms, Membership membership) async {
for (final entry in rooms.entries) {
final id = entry.key;
final room = entry.value;
// calculate the notification counts, the limitedTimeline and prevbatch
num highlight_count = 0;
num notification_count = 0;
@ -1224,8 +1222,7 @@ class Client {
roomObj.resetSortOrder();
}
if (database != null) {
dbActions.add(
() => database.storeRoomUpdate(this.id, update, getRoomById(id)));
await database.storeRoomUpdate(this.id, update, getRoomById(id));
}
onRoomUpdate.add(update);
@ -1235,45 +1232,44 @@ class Client {
if (room['state'] is Map<String, dynamic> &&
room['state']['events'] is List<dynamic> &&
room['state']['events'].isNotEmpty) {
_handleRoomEvents(id, room['state']['events'], 'state', dbActions);
await _handleRoomEvents(id, room['state']['events'], 'state');
handledEvents = true;
}
if (room['invite_state'] is Map<String, dynamic> &&
room['invite_state']['events'] is List<dynamic>) {
_handleRoomEvents(
id, room['invite_state']['events'], 'invite_state', dbActions);
await _handleRoomEvents(
id, room['invite_state']['events'], 'invite_state');
}
if (room['timeline'] is Map<String, dynamic> &&
room['timeline']['events'] is List<dynamic> &&
room['timeline']['events'].isNotEmpty) {
_handleRoomEvents(
id, room['timeline']['events'], 'timeline', dbActions);
await _handleRoomEvents(
id, room['timeline']['events'], 'timeline');
handledEvents = true;
}
if (room['ephemeral'] is Map<String, dynamic> &&
room['ephemeral']['events'] is List<dynamic>) {
_handleEphemerals(id, room['ephemeral']['events'], dbActions);
await _handleEphemerals(id, room['ephemeral']['events']);
}
if (room['account_data'] is Map<String, dynamic> &&
room['account_data']['events'] is List<dynamic>) {
_handleRoomEvents(
id, room['account_data']['events'], 'account_data', dbActions);
await _handleRoomEvents(
id, room['account_data']['events'], 'account_data');
}
if (handledEvents && database != null && roomObj != null) {
dbActions.add(() => roomObj.updateSortOrder());
await roomObj.updateSortOrder();
}
});
}
}
void _handleEphemerals(String id, List<dynamic> events,
List<Future<dynamic> Function()> dbActions) {
Future<void> _handleEphemerals(String id, List<dynamic> events) async {
for (num i = 0; i < events.length; i++) {
_handleEvent(events[i], id, 'ephemeral', dbActions);
await _handleEvent(events[i], id, 'ephemeral');
// Receipt events are deltas between two states. We will create a
// fake room account data event for this and store the difference
@ -1310,20 +1306,18 @@ class Client {
}
}
events[i]['content'] = receiptStateContent;
_handleEvent(events[i], id, 'account_data', dbActions);
await _handleEvent(events[i], id, 'account_data');
}
}
}
void _handleRoomEvents(String chat_id, List<dynamic> events, String type,
List<Future<dynamic> Function()> dbActions) {
Future<void> _handleRoomEvents(String chat_id, List<dynamic> events, String type) async {
for (num i = 0; i < events.length; i++) {
_handleEvent(events[i], chat_id, type, dbActions);
await _handleEvent(events[i], chat_id, type);
}
}
void _handleGlobalEvents(List<dynamic> events, String type,
List<Future<dynamic> Function()> dbActions) {
Future<void> _handleGlobalEvents(List<dynamic> events, String type) async {
for (var i = 0; i < events.length; i++) {
if (events[i]['type'] is String &&
events[i]['content'] is Map<String, dynamic>) {
@ -1333,15 +1327,14 @@ class Client {
content: events[i],
);
if (database != null) {
dbActions.add(() => database.storeUserEventUpdate(id, update));
await database.storeUserEventUpdate(id, update);
}
onUserEvent.add(update);
}
}
}
void _handleEvent(Map<String, dynamic> event, String roomID, String type,
List<Future<dynamic> Function()> dbActions) {
Future<void> _handleEvent(Map<String, dynamic> event, String roomID, String type) async {
if (event['type'] is String && event['content'] is Map<String, dynamic>) {
// The client must ignore any new m.room.encryption event to prevent
// man-in-the-middle attacks!
@ -1365,8 +1358,13 @@ class Client {
if (event['type'] == 'm.room.encrypted') {
update = update.decrypt(room);
}
if (update.eventType == 'm.room.encrypted' && database != null) {
// the event is still encrytped....let's try fetching the keys from the database!
await room.loadInboundGroupSessionKey(event['content']['session_id']);
update = update.decrypt(room);
}
if (type != 'ephemeral' && database != null) {
dbActions.add(() => database.storeEventUpdate(id, update));
await database.storeEventUpdate(id, update);
}
_updateRoomsByEventUpdate(update);
onEvent.add(update);

View file

@ -116,19 +116,6 @@ class Timeline {
try {
if (eventUpdate.roomID != room.id) return;
// try to decrypt the event first, if needed
if (eventUpdate.eventType == 'm.room.encrypted' && room.client.database != null) {
try {
await room.loadInboundGroupSessionKey(eventUpdate.content['content']['session_id']);
eventUpdate = eventUpdate.decrypt(room);
if (eventUpdate.eventType != 'm.room.encrypted') {
await room.client.database.storeEventUpdate(room.client.id, eventUpdate);
}
} catch (err) {
print('[WARNING] (_handleEventUpdate) Failed to decrypt event: ${err.toString()}');
}
}
if (eventUpdate.type == 'timeline' || eventUpdate.type == 'history') {
// Redaction events are handled as modification for existing events.
if (eventUpdate.eventType == 'm.room.redaction') {

View file

@ -207,7 +207,7 @@ void main() {
.verified,
false);
matrix.handleSync({
await matrix.handleSync({
'device_lists': {
'changed': [
'@alice:example.com',
@ -221,7 +221,7 @@ void main() {
expect(matrix.userDeviceKeys.length, 2);
expect(matrix.userDeviceKeys['@alice:example.com'].outdated, true);
matrix.handleSync({
await matrix.handleSync({
'rooms': {
'join': {
'!726s6s6q:example.com': {
@ -527,7 +527,7 @@ void main() {
test('Track oneTimeKeys', () async {
if (matrix.encryptionEnabled) {
var last = matrix.lastTimeKeysUploaded ?? DateTime.now();
matrix.handleSync({
await matrix.handleSync({
'device_one_time_keys_count': {'signed_curve25519': 49}
});
await Future.delayed(Duration(milliseconds: 50));
@ -542,7 +542,7 @@ void main() {
expect(matrix.rooms[1].outboundGroupSession == null, true);
await matrix.rooms[1].createOutboundGroupSession();
expect(matrix.rooms[1].outboundGroupSession != null, true);
matrix.handleSync({
await matrix.handleSync({
'device_lists': {
'changed': [
'@alice:example.com',
@ -562,7 +562,7 @@ void main() {
expect(matrix.rooms[1].outboundGroupSession == null, true);
await matrix.rooms[1].createOutboundGroupSession();
expect(matrix.rooms[1].outboundGroupSession != null, true);
matrix.handleSync({
await matrix.handleSync({
'rooms': {
'join': {
'!726s6s6q:example.com': {