104 lines
3.6 KiB
C#
104 lines
3.6 KiB
C#
using System.Reactive.Linq;
|
|
using Serilog;
|
|
using Websocket.Client;
|
|
|
|
namespace Discord.API;
|
|
|
|
public abstract class AbstractGateway {
|
|
protected readonly IWebsocketClient WebsocketClient;
|
|
protected readonly TimeProvider TimeProvider;
|
|
|
|
public AbstractGateway(IWebsocketClient client, TimeProvider time_provider){
|
|
this.WebsocketClient = client;
|
|
this.TimeProvider = time_provider;
|
|
WebsocketClient.DisconnectionHappened.Subscribe(DisconnectHandlerInternal);
|
|
WebsocketClient.ReconnectionHappened.Subscribe(ReconnectHandler);
|
|
WebsocketClient.MessageReceived.Subscribe(MessageReceivedHandler);
|
|
WebsocketClient.Start();
|
|
}
|
|
|
|
private void DisconnectHandlerInternal(DisconnectionInfo info){
|
|
StopHeartbeat();
|
|
DisconnectHandler(info);
|
|
}
|
|
|
|
protected abstract void DisconnectHandler(DisconnectionInfo info);
|
|
|
|
protected abstract void ReconnectHandler(ReconnectionInfo info);
|
|
|
|
protected abstract void MessageReceivedHandler(ResponseMessage message);
|
|
|
|
#region Heartbeat
|
|
|
|
private CancellationTokenSource? HeartbeatCts;
|
|
private CancellationTokenSource? InstantHeartbeatCts;
|
|
private DateTime? HeartbeatSent;
|
|
protected int HeartbeatPing {get; private set;}
|
|
private bool HeartbeatAcked;
|
|
|
|
/// <summary>
|
|
/// Sends a heartbeat packet to the remote
|
|
/// </summary>
|
|
protected abstract Task SendHeartbeat();
|
|
/// <summary>
|
|
/// Handler for when the heartbeat ack was not received
|
|
/// </summary>
|
|
/// <returns>Indicates if the heartbeat loop should exit</returns>
|
|
protected abstract Task<bool> MissingHeartbeatAckHandler();
|
|
|
|
private async Task HeartbeatTask(int heartbeat_interval){
|
|
CancellationToken ct = HeartbeatCts!.Token;
|
|
InstantHeartbeatCts = new();
|
|
HeartbeatSent = null;
|
|
await Task.Delay(
|
|
TimeSpan.FromMilliseconds(Random.Shared.Next(1, heartbeat_interval)),
|
|
TimeProvider,
|
|
InstantHeartbeatCts.Token);
|
|
|
|
if(ct.IsCancellationRequested) return; // Check if the task was cancelled when we were waiting
|
|
|
|
using PeriodicTimer pd = new(TimeSpan.FromMilliseconds(heartbeat_interval), TimeProvider);
|
|
HeartbeatAcked = true;
|
|
do{
|
|
if(!HeartbeatAcked){
|
|
if(await MissingHeartbeatAckHandler()){
|
|
break;
|
|
}
|
|
}
|
|
await SendHeartbeat();
|
|
HeartbeatSent = TimeProvider.GetUtcNow().DateTime;
|
|
HeartbeatAcked = false;
|
|
|
|
// Create new InstantHeartbeatCts if it was used
|
|
if(InstantHeartbeatCts.IsCancellationRequested) InstantHeartbeatCts = new();
|
|
} while(await pd.WaitForNextTickAsync(InstantHeartbeatCts.Token) && !ct.IsCancellationRequested);
|
|
}
|
|
|
|
protected int HeartbeatAckReceived(){
|
|
HeartbeatAcked = true;
|
|
if(HeartbeatSent == null){
|
|
Log.Warning("GATEWAY(abstract): HeartbeatAck received before heartbeat was sent");
|
|
return -1;
|
|
}
|
|
HeartbeatPing = (int)(TimeProvider.GetUtcNow().DateTime - HeartbeatSent.Value).TotalMilliseconds;
|
|
return HeartbeatPing;
|
|
}
|
|
|
|
protected void StartHeartbeat(int heartbeat_interval){
|
|
StopHeartbeat();
|
|
HeartbeatCts = new();
|
|
Task.Run(() => HeartbeatTask(heartbeat_interval), HeartbeatCts.Token);
|
|
}
|
|
|
|
protected void StopHeartbeat(){
|
|
HeartbeatCts?.Cancel();
|
|
InstantHeartbeatCts?.Cancel();
|
|
}
|
|
|
|
#endregion
|
|
|
|
public async Task Close(){
|
|
StopHeartbeat();
|
|
await WebsocketClient.Stop(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "Shutdown");
|
|
}
|
|
} |