feat: When Jobs websocket subscription disconnects, fall back to usual queries for two minutes

Resolves #542
This commit is contained in:
Inex Code 2024-08-06 19:46:52 +03:00
parent 449b9ebcb6
commit eb67c70fa0
3 changed files with 34 additions and 7 deletions

View file

@ -97,12 +97,15 @@ abstract class GraphQLApiMap {
); );
} }
Future<GraphQLClient> getSubscriptionClient() async { Future<GraphQLClient> getSubscriptionClient({
final Future<Duration?>? Function(int?, String?)? onConnectionLost,
}) async {
final WebSocketLink webSocketLink = WebSocketLink( final WebSocketLink webSocketLink = WebSocketLink(
'ws://api.$rootAddress/graphql', 'ws://api.$rootAddress/graphql',
// Only [GraphQLProtocol.graphqlTransportWs] supports automatic pings, so we don't disconnect when nothing happens. // Only [GraphQLProtocol.graphqlTransportWs] supports automatic pings, so we don't disconnect when nothing happens.
subProtocol: GraphQLProtocol.graphqlTransportWs, subProtocol: GraphQLProtocol.graphqlTransportWs,
config: SocketClientConfig( config: SocketClientConfig(
onConnectionLost: onConnectionLost,
autoReconnect: true, autoReconnect: true,
initialPayload: _token.isEmpty initialPayload: _token.isEmpty
? null ? null

View file

@ -22,8 +22,12 @@ mixin JobsApi on GraphQLApiMap {
return jobsList; return jobsList;
} }
Stream<List<ServerJob>> getServerJobsStream() async* { Stream<List<ServerJob>> getServerJobsStream({
final GraphQLClient client = await getSubscriptionClient(); final Future<Duration?>? Function(int?, String?)? onConnectionLost,
}) async* {
final GraphQLClient client = await getSubscriptionClient(
onConnectionLost: onConnectionLost,
);
final subscription = client.subscribe$JobUpdates(); final subscription = client.subscribe$JobUpdates();
await for (final response in subscription) { await for (final response in subscription) {
final jobsList = response.parsedData?.jobUpdates final jobsList = response.parsedData?.jobUpdates

View file

@ -45,6 +45,7 @@ class ApiConnectionRepository {
Timer? _timer; Timer? _timer;
StreamSubscription<List<ServerJob>>? _serverJobsStreamSubscription; StreamSubscription<List<ServerJob>>? _serverJobsStreamSubscription;
DateTime? _jobsStreamDisconnectTime;
Future<void> removeServerJob(final String uid) async { Future<void> removeServerJob(final String uid) async {
await api.removeApiJob(uid); await api.removeApiJob(uid);
@ -291,10 +292,12 @@ class ApiConnectionRepository {
if (VersionConstraint.parse(wsJobsUpdatesSupportedVersion) if (VersionConstraint.parse(wsJobsUpdatesSupportedVersion)
.allows(Version.parse(apiVersion))) { .allows(Version.parse(apiVersion))) {
_serverJobsStreamSubscription = _serverJobsStreamSubscription = api
api.getServerJobsStream().listen((final List<ServerJob> jobs) { .getServerJobsStream(onConnectionLost: _handleWebsocketDisconnect)
.listen((final List<ServerJob> jobs) {
_apiData.serverJobs.data = jobs; _apiData.serverJobs.data = jobs;
_dataStream.add(_apiData); _dataStream.add(_apiData);
_jobsStreamDisconnectTime = null;
}); });
} }
@ -314,9 +317,26 @@ class ApiConnectionRepository {
static const String wsJobsUpdatesSupportedVersion = '>=3.3.0'; static const String wsJobsUpdatesSupportedVersion = '>=3.3.0';
bool _isForceServerJobsRefetchRequired() {
if (_serverJobsStreamSubscription == null) {
return true;
}
return _apiData.serverJobs.data == null ||
(_jobsStreamDisconnectTime != null &&
DateTime.now().difference(_jobsStreamDisconnectTime!) <
const Duration(seconds: 120));
}
Future<Duration?>? _handleWebsocketDisconnect(
final int? code,
final String? reason,
) {
_jobsStreamDisconnectTime = DateTime.now();
return null;
}
Future<void> _refetchEverything(final Version version) async { Future<void> _refetchEverything(final Version version) async {
if (_serverJobsStreamSubscription == null || if (_isForceServerJobsRefetchRequired()) {
_apiData.serverJobs.data == null) {
await _apiData.serverJobs await _apiData.serverJobs
.refetchData(version, () => _dataStream.add(_apiData)); .refetchData(version, () => _dataStream.add(_apiData));
} }