diff --git a/lib/logic/api_maps/graphql_maps/graphql_api_map.dart b/lib/logic/api_maps/graphql_maps/graphql_api_map.dart index 80d345b8..511ae78f 100644 --- a/lib/logic/api_maps/graphql_maps/graphql_api_map.dart +++ b/lib/logic/api_maps/graphql_maps/graphql_api_map.dart @@ -97,12 +97,15 @@ abstract class GraphQLApiMap { ); } - Future getSubscriptionClient() async { + Future getSubscriptionClient({ + final Future? Function(int?, String?)? onConnectionLost, + }) async { final WebSocketLink webSocketLink = WebSocketLink( 'ws://api.$rootAddress/graphql', // Only [GraphQLProtocol.graphqlTransportWs] supports automatic pings, so we don't disconnect when nothing happens. subProtocol: GraphQLProtocol.graphqlTransportWs, config: SocketClientConfig( + onConnectionLost: onConnectionLost, autoReconnect: true, initialPayload: _token.isEmpty ? null diff --git a/lib/logic/api_maps/graphql_maps/server_api/jobs_api.dart b/lib/logic/api_maps/graphql_maps/server_api/jobs_api.dart index 8f667465..50149b7e 100644 --- a/lib/logic/api_maps/graphql_maps/server_api/jobs_api.dart +++ b/lib/logic/api_maps/graphql_maps/server_api/jobs_api.dart @@ -22,8 +22,12 @@ mixin JobsApi on GraphQLApiMap { return jobsList; } - Stream> getServerJobsStream() async* { - final GraphQLClient client = await getSubscriptionClient(); + Stream> getServerJobsStream({ + final Future? Function(int?, String?)? onConnectionLost, + }) async* { + final GraphQLClient client = await getSubscriptionClient( + onConnectionLost: onConnectionLost, + ); final subscription = client.subscribe$JobUpdates(); await for (final response in subscription) { final jobsList = response.parsedData?.jobUpdates diff --git a/lib/logic/get_it/api_connection_repository.dart b/lib/logic/get_it/api_connection_repository.dart index 99f5d9b8..885c90f0 100644 --- a/lib/logic/get_it/api_connection_repository.dart +++ b/lib/logic/get_it/api_connection_repository.dart @@ -45,6 +45,7 @@ class ApiConnectionRepository { Timer? _timer; StreamSubscription>? _serverJobsStreamSubscription; + DateTime? _jobsStreamDisconnectTime; Future removeServerJob(final String uid) async { await api.removeApiJob(uid); @@ -291,10 +292,12 @@ class ApiConnectionRepository { if (VersionConstraint.parse(wsJobsUpdatesSupportedVersion) .allows(Version.parse(apiVersion))) { - _serverJobsStreamSubscription = - api.getServerJobsStream().listen((final List jobs) { + _serverJobsStreamSubscription = api + .getServerJobsStream(onConnectionLost: _handleWebsocketDisconnect) + .listen((final List jobs) { _apiData.serverJobs.data = jobs; _dataStream.add(_apiData); + _jobsStreamDisconnectTime = null; }); } @@ -314,9 +317,26 @@ class ApiConnectionRepository { static const String wsJobsUpdatesSupportedVersion = '>=3.3.0'; + bool _isForceServerJobsRefetchRequired() { + if (_serverJobsStreamSubscription == null) { + return true; + } + return _apiData.serverJobs.data == null || + (_jobsStreamDisconnectTime != null && + DateTime.now().difference(_jobsStreamDisconnectTime!) < + const Duration(seconds: 120)); + } + + Future? _handleWebsocketDisconnect( + final int? code, + final String? reason, + ) { + _jobsStreamDisconnectTime = DateTime.now(); + return null; + } + Future _refetchEverything(final Version version) async { - if (_serverJobsStreamSubscription == null || - _apiData.serverJobs.data == null) { + if (_isForceServerJobsRefetchRequired()) { await _apiData.serverJobs .refetchData(version, () => _dataStream.add(_apiData)); }