Added events for forwarding packets

This commit is contained in:
Kecskeméti László 2024-07-24 11:30:55 +02:00
parent b01ba1e2cd
commit ad7597379a
4 changed files with 25 additions and 2 deletions

View File

@ -1,4 +1,5 @@
using Discord.API.Rest;
using System.Reactive.Subjects;
using Discord.API.Rest;
using Serilog;
namespace Discord.API;
@ -13,6 +14,10 @@ public class DiscordClient
private GatewayClient? Gateway;
private TimeProvider TimeProvider;
private readonly Subject<GatewayPacket> PacketStream = new Subject<GatewayPacket>();
public IObservable<GatewayPacket> PacketReceived => PacketStream;
private IDisposable? Subscription;
public DiscordClient(string api_key, Intents intents){
TimeProvider = TimeProvider.System;
this.Intents=intents;
@ -46,6 +51,7 @@ public class DiscordClient
Log.Information("DiscordClient: Gateway url: {gateway_url}", resp.Value.Url);
Gateway = new GatewayClient(resp.Value.Url, ApiKey, TimeProvider, (ulong)Intents);
Subscription = Gateway.PacketReceived.Subscribe(PacketStream);
Log.Information("DiscordClient: Started gateway");
}
@ -53,5 +59,6 @@ public class DiscordClient
public async Task Close(){
if(Gateway is not null)
await Gateway.Close();
Subscription?.Dispose();
}
}

View File

@ -98,7 +98,7 @@ public abstract class AbstractGateway {
#endregion
public async Task Close(){
public virtual async Task Close(){
StopHeartbeat();
await WebsocketClient.Stop(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "Shutdown");
foreach (var sub in Subscriptions)

View File

@ -1,4 +1,5 @@
using System.Collections.Specialized;
using System.Reactive.Subjects;
using System.Text.Json;
using Serilog;
using Websocket.Client;
@ -15,6 +16,9 @@ public class GatewayClient : AbstractGateway {
private string? SessionId = null;
private Uri StartingUri;
private Subject<GatewayPacket> PacketStream = new();
public IObservable<GatewayPacket> PacketReceived => PacketStream;
public GatewayClient(string url, string api_key, TimeProvider time_provider, ulong intents)
: this(url, api_key, time_provider, uri => new WebsocketClient(uri), intents)
{ }
@ -43,6 +47,7 @@ public class GatewayClient : AbstractGateway {
if((int?)info.CloseStatus is 4004 or 4008 or (>= 4010 and <= 4014)){
Log.Error("GATEWAY: Disconnection due to error: close code: {code}, desc: {desc}", info.CloseStatus, info.CloseStatusDescription);
info.CancelReconnection = true;
PacketStream.OnError(new Exception("Gateway failed to authenticate"));
}else if((int?)info.CloseStatus is 4007 or 4009){
SessionId = null;
Sequence = null;
@ -77,6 +82,8 @@ public class GatewayClient : AbstractGateway {
Log.Debug("GATEWAY: Packet not handled {opcode}", packet.Op);
break;
}
PacketStream.OnNext(packet);
}catch(Exception ex){
Log.Warning(ex, "GATEWAY: Error processing gateway event");
}
@ -160,4 +167,10 @@ public class GatewayClient : AbstractGateway {
Log.Debug("GATEWAY: Identifying for a new session");
}
}
public override async Task Close()
{
PacketStream.OnCompleted();
await base.Close();
}
}

View File

@ -9,6 +9,9 @@ Log.Logger=new LoggerConfiguration()
string api_key = File.ReadAllText("api_key.txt");
DiscordClient client = new DiscordClient(api_key, Intents.Guilds);
client.PacketReceived.Subscribe(packet=>{
Console.WriteLine($"Packet {packet.Op} was received");
});
Console.ReadLine();