Gateway heartbeat works

This commit is contained in:
Kecskeméti László 2024-07-22 12:25:27 +02:00
parent 6df9346de1
commit be80cad0cc
6 changed files with 117 additions and 10 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@
## Get latest from `dotnet new gitignore`
Generated/
*.txt
# dotenv files
.env

View File

@ -5,6 +5,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>Discord.API</RootNamespace>
<PublishAot>true</PublishAot>
</PropertyGroup>
<PropertyGroup>

View File

@ -0,0 +1,46 @@
using Discord.API.Rest;
using Serilog;
namespace Discord.API;
public class DiscordClient
{
private static readonly string ApiBaseUrl = "https://discord.com/api/v10"; //TODO Export version number to a separate constant
private string ApiKey;
private RestClient Rest;
private GatewayClient? Gateway;
private TimeProvider TimeProvider;
public DiscordClient(string api_key) : this(api_key, TimeProvider.System){
}
internal DiscordClient(string api_key, TimeProvider time_provider){
this.TimeProvider=time_provider;
this.ApiKey=api_key;
Rest = new RestClient(ApiBaseUrl, ApiKey);
Task.Run(ConnectGateway);
}
private async Task ConnectGateway(){
RestResponse<GetGatewayResponse> resp = await Rest.GetGateway();
while(resp is not RestSuccessResponse<GetGatewayResponse>){
Log.Debug("DiscordClient: Failed to get gateway url");
if(resp is RestErrorResponse<GetGatewayResponse> error){
Log.Error("DiscordClient: Get gateway returned error: {error_msg}", error.Error.Message);
break;
}else{
Log.Information("DiscordClient: Get gateway failed: {ex}; Retrying later", (resp as RestFailResponse<GetGatewayResponse>)!.Exception?.Message);
}
await Task.Delay(TimeSpan.FromMilliseconds(2000), TimeProvider, cancellationToken: default);
resp = await Rest.GetGateway();
}
Log.Information("DiscordClient: Gateway url: {gateway_url}", resp.Value.Url);
Gateway = new GatewayClient(resp.Value.Url, ApiKey, TimeProvider);
Log.Information("DiscordClient: Started gateway");
}
}

View File

@ -1,4 +1,5 @@
using System.Collections.Specialized;
using System.Security.Cryptography;
using System.Text.Json;
using Serilog;
using Websocket.Client;
@ -10,26 +11,35 @@ public class GatewayClient {
private const int ApiVersion = 10;
private string ApiKey;
private WebsocketClient Websocket;
private CancellationTokenSource? heartbeat_cts;
private IWebsocketClient Websocket;
private CancellationTokenSource? HeartbeatCts;
private ulong? Sequence = null;
private DateTime? HeartbeatAckReceived = null;
private TimeProvider timeProvider;
public GatewayClient(string url, string api_key){
public GatewayClient(string url, string api_key, TimeProvider time_provider)
: this(url, api_key, time_provider, uri => new WebsocketClient(uri))
{ }
internal GatewayClient(string url, string api_key, TimeProvider time_provider, Func<Uri, IWebsocketClient> websocket_client_factory){
this.timeProvider=time_provider;
this.ApiKey=api_key;
UriBuilder uriBuilder = new(url);
NameValueCollection query = System.Web.HttpUtility.ParseQueryString("");
query.Add("v", ApiVersion.ToString());
query.Add("encoding", "json");
uriBuilder.Query=query.ToString();
Websocket = new WebsocketClient(uriBuilder.Uri);
Websocket = websocket_client_factory.Invoke(uriBuilder.Uri);
Websocket.DisconnectionHappened.Subscribe(DisconnectionHandler);
Websocket.ReconnectionHappened.Subscribe(ReconnectionHandler);
Websocket.MessageReceived.Subscribe(MessageReceivedHandler);
Websocket.Start();
Log.Debug("GATEWAY: Created new gateway, with url: {url}", uriBuilder.ToString());
}
private void DisconnectionHandler(DisconnectionInfo info){
StopHeartbeat();
Log.Information("GATEWAY: Disconnected. Type: {DisconnectionType}", info.Type);
}
private void ReconnectionHandler(ReconnectionInfo info){
@ -41,10 +51,18 @@ public class GatewayClient {
GatewayPacket packet = JsonSerializer.Deserialize(msg.Text!, SourceGenerationContext.Default.GatewayPacket)
?? throw new Exception("Failed to deserialize packet"); // This can be optimized //TODO
Log.Debug("GATEWAY: Packet received");
switch(packet){
case HelloPacket helloPacket:
HelloPacketHandler(helloPacket);
break;
case HeartbeatAckPacket:
HeartbeatAckHandler();
break;
default:
Log.Debug("GATEWAY: Packet not handled");
break;
}
}catch(Exception ex){
Log.Warning(ex, "GATEWAY: Error processing gateway event");
@ -53,16 +71,30 @@ public class GatewayClient {
private void HelloPacketHandler(HelloPacket packet){
StartHeartbeat((int)packet.Data.HeartbeatInterval);
Log.Debug("GATEWAY: Hello packet received, heartbeat interval: {heartbeat_ms}", packet.Data.HeartbeatInterval);
}
private void HeartbeatAckHandler(){
HeartbeatAckReceived = timeProvider.GetUtcNow().DateTime;
Log.Debug("GATEWAY: Heartbeat ACK received");
}
private void StartHeartbeat(int heartbeat_interval){
heartbeat_cts?.Cancel();
heartbeat_cts = new CancellationTokenSource();
CancellationToken ct = heartbeat_cts.Token;
HeartbeatCts?.Cancel();
HeartbeatCts = new CancellationTokenSource();
CancellationToken ct = HeartbeatCts.Token;
Task.Run(async ()=>{
await Task.Delay(Random.Shared.Next(1, heartbeat_interval));
using PeriodicTimer pd = new(TimeSpan.FromMilliseconds(heartbeat_interval));
using PeriodicTimer pd = new(TimeSpan.FromMilliseconds(heartbeat_interval), timeProvider);
HeartbeatAckReceived = timeProvider.GetUtcNow().DateTime;
DateTime HeartbeatSent = timeProvider.GetUtcNow().DateTime;
do{
if(HeartbeatAckReceived == null){
Log.Debug("GATEWAY: Heartbeat ack not received. Reconnecting.");
_ = Websocket.Reconnect();
break;
}
Log.Information("GATEWAY: Heartbeat ping time is {time_ms} ms", (HeartbeatAckReceived.Value - HeartbeatSent).TotalMilliseconds);
HeartbeatPacket packet = new HeartbeatPacket(){
Sequence=this.Sequence
};
@ -70,7 +102,14 @@ public class GatewayClient {
if(!Websocket.Send(JsonSerializer.Serialize(packet, SourceGenerationContext.Default.HeartbeatPacket))){
Log.Warning("GATEWAY: Failed to queue heartbeat message");
}
HeartbeatSent = timeProvider.GetUtcNow().DateTime;
HeartbeatAckReceived = null;
Log.Debug("GATEWAY: Heartbeat sent");
}while(await pd.WaitForNextTickAsync(ct) && !ct.IsCancellationRequested);
});
}
private void StopHeartbeat(){
HeartbeatCts?.Cancel();
}
}

View File

@ -1,2 +1,13 @@
// See https://aka.ms/new-console-template for more information
Console.WriteLine("Hello, World!");
using Discord.API;
using Serilog;
Log.Logger=new LoggerConfiguration()
.WriteTo.Console(Serilog.Events.LogEventLevel.Verbose)
.MinimumLevel.Verbose()
.CreateLogger();
string api_key = File.ReadAllText("api_key.txt");
DiscordClient client = new DiscordClient(api_key);
Console.ReadLine();

View File

@ -1,5 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\Discord.API\Discord.API.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Serilog" Version="4.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
</ItemGroup>
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>