using System.Reactive.Linq; using Serilog; using Websocket.Client; namespace Discord.API; public abstract class AbstractGateway { protected readonly IWebsocketClient WebsocketClient; protected readonly TimeProvider TimeProvider; private List Subscriptions = new(3); public AbstractGateway(IWebsocketClient client, TimeProvider time_provider){ this.WebsocketClient = client; this.TimeProvider = time_provider; Subscriptions.Add(WebsocketClient.DisconnectionHappened.Subscribe(DisconnectHandlerInternal)); Subscriptions.Add(WebsocketClient.ReconnectionHappened.Subscribe(ReconnectHandler)); Subscriptions.Add(WebsocketClient.MessageReceived.Subscribe(MessageReceivedHandler)); WebsocketClient.Start(); } private void DisconnectHandlerInternal(DisconnectionInfo info){ StopHeartbeat(); DisconnectHandler(info); } protected abstract void DisconnectHandler(DisconnectionInfo info); protected abstract void ReconnectHandler(ReconnectionInfo info); protected abstract void MessageReceivedHandler(ResponseMessage message); #region Heartbeat private CancellationTokenSource? HeartbeatCts; private CancellationTokenSource? InstantHeartbeatCts; private DateTime? HeartbeatSent; protected int HeartbeatPing {get; private set;} private bool HeartbeatAcked; /// /// Sends a heartbeat packet to the remote /// protected abstract Task SendHeartbeat(); /// /// Handler for when the heartbeat ack was not received /// /// Indicates if the heartbeat loop should exit protected abstract Task MissingHeartbeatAckHandler(); private async Task HeartbeatTask(int heartbeat_interval){ CancellationToken ct = HeartbeatCts!.Token; InstantHeartbeatCts = new(); HeartbeatSent = null; await Task.Delay( TimeSpan.FromMilliseconds(Random.Shared.Next(1, heartbeat_interval)), TimeProvider, InstantHeartbeatCts.Token); if(ct.IsCancellationRequested) return; // Check if the task was cancelled when we were waiting using PeriodicTimer pd = new(TimeSpan.FromMilliseconds(heartbeat_interval), TimeProvider); HeartbeatAcked = true; do{ if(!HeartbeatAcked){ if(await MissingHeartbeatAckHandler()){ break; } } await SendHeartbeat(); HeartbeatSent = TimeProvider.GetUtcNow().DateTime; HeartbeatAcked = false; // Create new InstantHeartbeatCts if it was used if(InstantHeartbeatCts.IsCancellationRequested) InstantHeartbeatCts = new(); } while(await pd.WaitForNextTickAsync(InstantHeartbeatCts.Token) && !ct.IsCancellationRequested); } protected int HeartbeatAckReceived(){ HeartbeatAcked = true; if(HeartbeatSent == null){ Log.Warning("GATEWAY(abstract): HeartbeatAck received before heartbeat was sent"); return -1; } HeartbeatPing = (int)(TimeProvider.GetUtcNow().DateTime - HeartbeatSent.Value).TotalMilliseconds; return HeartbeatPing; } protected void StartHeartbeat(int heartbeat_interval){ StopHeartbeat(); HeartbeatCts = new(); Task.Run(() => HeartbeatTask(heartbeat_interval), HeartbeatCts.Token); } protected void StopHeartbeat(){ HeartbeatCts?.Cancel(); InstantHeartbeatCts?.Cancel(); } #endregion public async Task Close(){ StopHeartbeat(); await WebsocketClient.Stop(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "Shutdown"); foreach (var sub in Subscriptions) { sub.Dispose(); } } }