using System.Collections.Specialized; using System.Reactive.Subjects; using System.Text.Json; using Serilog; using Websocket.Client; namespace Discord.API; public class GatewayClient : AbstractGateway { private const int ApiVersion = 10; private readonly string ApiKey; private readonly ulong Intents; private ulong? Sequence = null; private string? SessionId = null; private Uri StartingUri; private Subject PacketStream = new(); public IObservable PacketReceived => PacketStream; public GatewayClient(string url, string api_key, TimeProvider time_provider, ulong intents) : this(url, api_key, time_provider, uri => new WebsocketClient(uri), intents) { } internal GatewayClient(string url, string api_key, TimeProvider time_provider, Func websocket_client_factory, ulong intents) : base(websocket_client_factory.Invoke(BuildUrl(url)), time_provider){ this.ApiKey=api_key; this.Intents = intents; this.StartingUri = WebsocketClient.Url; Log.Debug("GATEWAY: Created new gateway, with url: {url}", WebsocketClient.Url); } private static Uri BuildUrl(string url){ UriBuilder uriBuilder = new(url); NameValueCollection query = System.Web.HttpUtility.ParseQueryString(""); query.Add("v", ApiVersion.ToString()); query.Add("encoding", "json"); uriBuilder.Query=query.ToString(); return uriBuilder.Uri; } protected override void DisconnectHandler(DisconnectionInfo info){ Log.Information("GATEWAY: Disconnected. Type: {DisconnectionType}", info.Type); Log.Debug("GATEWAY: Connection closed due to {code} {description}", info.CloseStatus, info.CloseStatusDescription); if((int?)info.CloseStatus is 4004 or 4008 or (>= 4010 and <= 4014)){ Log.Error("GATEWAY: Disconnection due to error: close code: {code}, desc: {desc}", info.CloseStatus, info.CloseStatusDescription); info.CancelReconnection = true; PacketStream.OnError(new Exception("Gateway failed to authenticate")); }else if((int?)info.CloseStatus is 4007 or 4009){ SessionId = null; Sequence = null; WebsocketClient.Url = StartingUri; } } protected override void ReconnectHandler(ReconnectionInfo info){ Log.Information("GATEWAY: (Re)Connected to server. Url: {url}, Type: {Type}", WebsocketClient.Url, info.Type); } protected override void MessageReceivedHandler(ResponseMessage msg){ if(msg.MessageType != System.Net.WebSockets.WebSocketMessageType.Text) return; try{ GatewayPacket packet = JsonSerializer.Deserialize(msg.Text!, SourceGenerationContext.Default.GatewayPacket) ?? throw new Exception("Failed to deserialize packet"); // This can be optimized //TODO Log.Debug("GATEWAY: Packet received, opcode: {opcode}", packet.Op); switch(packet){ case HelloPacket helloPacket: HelloPacketHandler(helloPacket); break; case HeartbeatAckPacket: HeartbeatAckHandler(); break; case DispatchPacket dispatchPacket: DispatchHandler(dispatchPacket); break; case InvalidSessionPacket invalidSessionPacket: InvalidSessionHandler(invalidSessionPacket); break; case HeartbeatPacket: HeartbeatPacketHandler(); break; default: Log.Debug("GATEWAY: Packet not handled {opcode}", packet.Op); break; } PacketStream.OnNext(packet); }catch(Exception ex){ Log.Warning(ex, "GATEWAY: Error processing gateway event"); } } private void HelloPacketHandler(HelloPacket packet){ StartHeartbeat((int)packet.Data.HeartbeatInterval); Log.Debug("GATEWAY: Hello packet received, heartbeat interval: {heartbeat_ms}", packet.Data.HeartbeatInterval); Login(); } private void HeartbeatAckHandler(){ Log.Debug("GATEWAY: Heartbeat ACK received"); Log.Information("GATEWAY: Heartbeat ping: {ping_ms} ms", HeartbeatAckReceived()); } private void DispatchHandler(DispatchPacket packet){ Log.Debug("GATEWAY: Event: {event}, seq: {sequence}", packet.Event, packet.Sequence); Sequence = packet.Sequence; switch(packet){ case ReadyPacket ready_packet: ReadyHandler(ready_packet); break; } } private void InvalidSessionHandler(InvalidSessionPacket packet){ Log.Debug("GATEWAY: Invalid session resumable: {resumable}", packet.Resume); if(packet.Resume != true){ // This look awful tbh SessionId=null; Sequence=null; WebsocketClient.Url=StartingUri; _ = WebsocketClient.Reconnect(); } } private void ReadyHandler(ReadyPacket packet){ SessionId = packet.Data.SessionId; WebsocketClient.Url = BuildUrl(packet.Data.ResumeGatewayUrl); Log.Debug("GATEWAY: Resume url: {url}", packet.Data.ResumeGatewayUrl); } private void HeartbeatPacketHandler(){ ImmediateHeartbeat(); Log.Debug("GATEWAY: Remote requested immediate heartbeat"); } protected override Task SendHeartbeat() { HeartbeatPacket packet = new(){ Sequence = Sequence }; WebsocketClient.Send(JsonSerializer.Serialize(packet, SourceGenerationContext.Default.HeartbeatPacket)); Log.Debug("GATEWAY: Heartbeat sent"); return Task.CompletedTask; } protected override Task MissingHeartbeatAckHandler() { _ = WebsocketClient.Reconnect(); Log.Debug("GATEWAY: Heartbeat ack missed. Reconnecting"); return Task.FromResult(true); } private void Login(){ if(SessionId != null && Sequence != null){ // Resume ResumePacket packet = new(){ Data=new(){ Token=ApiKey, Sequence=Sequence.Value, SessionId=SessionId } }; WebsocketClient.Send(JsonSerializer.Serialize(packet, SourceGenerationContext.Default.ResumePacket)); Log.Debug("GATEWAY: Resuming previus session"); }else{ // Identify IdentifyPacket packet = new(){ Data=new(){ Intents = Intents, Token = ApiKey } }; WebsocketClient.Send(JsonSerializer.Serialize(packet, SourceGenerationContext.Default.IdentifyPacket)); Log.Debug("GATEWAY: Identifying for a new session"); } } public override async Task Close() { PacketStream.OnCompleted(); await base.Close(); } }