diff --git a/Discord.API/DiscordClient.cs b/Discord.API/DiscordClient.cs index 0892db4..a289ed2 100644 --- a/Discord.API/DiscordClient.cs +++ b/Discord.API/DiscordClient.cs @@ -1,4 +1,5 @@ -using Discord.API.Rest; +using System.Reactive.Subjects; +using Discord.API.Rest; using Serilog; namespace Discord.API; @@ -13,6 +14,10 @@ public class DiscordClient private GatewayClient? Gateway; private TimeProvider TimeProvider; + private readonly Subject PacketStream = new Subject(); + public IObservable PacketReceived => PacketStream; + private IDisposable? Subscription; + public DiscordClient(string api_key, Intents intents){ TimeProvider = TimeProvider.System; this.Intents=intents; @@ -46,6 +51,7 @@ public class DiscordClient Log.Information("DiscordClient: Gateway url: {gateway_url}", resp.Value.Url); Gateway = new GatewayClient(resp.Value.Url, ApiKey, TimeProvider, (ulong)Intents); + Subscription = Gateway.PacketReceived.Subscribe(PacketStream); Log.Information("DiscordClient: Started gateway"); } @@ -53,5 +59,6 @@ public class DiscordClient public async Task Close(){ if(Gateway is not null) await Gateway.Close(); + Subscription?.Dispose(); } } diff --git a/Discord.API/Gateway/AbstractGateway.cs b/Discord.API/Gateway/AbstractGateway.cs index 16943e4..98a097b 100644 --- a/Discord.API/Gateway/AbstractGateway.cs +++ b/Discord.API/Gateway/AbstractGateway.cs @@ -98,7 +98,7 @@ public abstract class AbstractGateway { #endregion - public async Task Close(){ + public virtual async Task Close(){ StopHeartbeat(); await WebsocketClient.Stop(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "Shutdown"); foreach (var sub in Subscriptions) diff --git a/Discord.API/Gateway/GatewayClient.cs b/Discord.API/Gateway/GatewayClient.cs index 933a379..d0223df 100644 --- a/Discord.API/Gateway/GatewayClient.cs +++ b/Discord.API/Gateway/GatewayClient.cs @@ -1,4 +1,5 @@ using System.Collections.Specialized; +using System.Reactive.Subjects; using System.Text.Json; using Serilog; using Websocket.Client; @@ -15,6 +16,9 @@ public class GatewayClient : AbstractGateway { private string? SessionId = null; private Uri StartingUri; + private Subject PacketStream = new(); + public IObservable PacketReceived => PacketStream; + public GatewayClient(string url, string api_key, TimeProvider time_provider, ulong intents) : this(url, api_key, time_provider, uri => new WebsocketClient(uri), intents) { } @@ -43,6 +47,7 @@ public class GatewayClient : AbstractGateway { if((int?)info.CloseStatus is 4004 or 4008 or (>= 4010 and <= 4014)){ Log.Error("GATEWAY: Disconnection due to error: close code: {code}, desc: {desc}", info.CloseStatus, info.CloseStatusDescription); info.CancelReconnection = true; + PacketStream.OnError(new Exception("Gateway failed to authenticate")); }else if((int?)info.CloseStatus is 4007 or 4009){ SessionId = null; Sequence = null; @@ -77,6 +82,8 @@ public class GatewayClient : AbstractGateway { Log.Debug("GATEWAY: Packet not handled {opcode}", packet.Op); break; } + + PacketStream.OnNext(packet); }catch(Exception ex){ Log.Warning(ex, "GATEWAY: Error processing gateway event"); } @@ -160,4 +167,10 @@ public class GatewayClient : AbstractGateway { Log.Debug("GATEWAY: Identifying for a new session"); } } + + public override async Task Close() + { + PacketStream.OnCompleted(); + await base.Close(); + } } diff --git a/example_bot/Program.cs b/example_bot/Program.cs index e768e67..d006b73 100644 --- a/example_bot/Program.cs +++ b/example_bot/Program.cs @@ -9,6 +9,9 @@ Log.Logger=new LoggerConfiguration() string api_key = File.ReadAllText("api_key.txt"); DiscordClient client = new DiscordClient(api_key, Intents.Guilds); +client.PacketReceived.Subscribe(packet=>{ + Console.WriteLine($"Packet {packet.Op} was received"); +}); Console.ReadLine();