DiscordApi/Discord.API/Gateway/AbstractGateway.cs

113 lines
3.9 KiB
C#

using System.Reactive.Linq;
using Serilog;
using Websocket.Client;
namespace Discord.API;
public abstract class AbstractGateway {
protected readonly IWebsocketClient WebsocketClient;
protected readonly TimeProvider TimeProvider;
private List<IDisposable> Subscriptions = new(3);
public AbstractGateway(IWebsocketClient client, TimeProvider time_provider){
this.WebsocketClient = client;
this.TimeProvider = time_provider;
Subscriptions.Add(WebsocketClient.DisconnectionHappened.Subscribe(DisconnectHandlerInternal));
Subscriptions.Add(WebsocketClient.ReconnectionHappened.Subscribe(ReconnectHandler));
Subscriptions.Add(WebsocketClient.MessageReceived.Subscribe(MessageReceivedHandler));
WebsocketClient.Start();
}
private void DisconnectHandlerInternal(DisconnectionInfo info){
StopHeartbeat();
DisconnectHandler(info);
}
protected abstract void DisconnectHandler(DisconnectionInfo info);
protected abstract void ReconnectHandler(ReconnectionInfo info);
protected abstract void MessageReceivedHandler(ResponseMessage message);
#region Heartbeat
private CancellationTokenSource? HeartbeatCts;
private CancellationTokenSource? InstantHeartbeatCts;
private DateTime? HeartbeatSent;
protected int HeartbeatPing {get; private set;}
private bool HeartbeatAcked;
/// <summary>
/// Sends a heartbeat packet to the remote
/// </summary>
protected abstract Task SendHeartbeat();
/// <summary>
/// Handler for when the heartbeat ack was not received
/// </summary>
/// <returns>Indicates if the heartbeat loop should exit</returns>
protected abstract Task<bool> MissingHeartbeatAckHandler();
private async Task HeartbeatTask(int heartbeat_interval){
CancellationToken ct = HeartbeatCts!.Token;
InstantHeartbeatCts = new();
HeartbeatSent = null;
await Task.Delay(
TimeSpan.FromMilliseconds(Random.Shared.Next(1, heartbeat_interval)),
TimeProvider,
InstantHeartbeatCts.Token);
if(ct.IsCancellationRequested) return; // Check if the task was cancelled when we were waiting
using PeriodicTimer pd = new(TimeSpan.FromMilliseconds(heartbeat_interval), TimeProvider);
HeartbeatAcked = true;
do{
if(!HeartbeatAcked){
if(await MissingHeartbeatAckHandler()){
break;
}
}
await SendHeartbeat();
HeartbeatSent = TimeProvider.GetUtcNow().DateTime;
HeartbeatAcked = false;
// Create new InstantHeartbeatCts if it was used
if(InstantHeartbeatCts.IsCancellationRequested) InstantHeartbeatCts = new();
} while(await pd.WaitForNextTickAsync(InstantHeartbeatCts.Token) && !ct.IsCancellationRequested);
}
protected int HeartbeatAckReceived(){
HeartbeatAcked = true;
if(HeartbeatSent == null){
Log.Warning("GATEWAY(abstract): HeartbeatAck received before heartbeat was sent");
return -1;
}
HeartbeatPing = (int)(TimeProvider.GetUtcNow().DateTime - HeartbeatSent.Value).TotalMilliseconds;
return HeartbeatPing;
}
protected void StartHeartbeat(int heartbeat_interval){
StopHeartbeat();
HeartbeatCts = new();
Task.Run(() => HeartbeatTask(heartbeat_interval), HeartbeatCts.Token);
}
protected void StopHeartbeat(){
HeartbeatCts?.Cancel();
InstantHeartbeatCts?.Cancel();
}
protected void ImmediateHeartbeat(){
InstantHeartbeatCts?.Cancel();
}
#endregion
public virtual async Task Close(){
StopHeartbeat();
await WebsocketClient.Stop(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "Shutdown");
foreach (var sub in Subscriptions)
{
sub.Dispose();
}
}
}