mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-10-01 08:02:49 +02:00
216 lines
7.9 KiB
C#
216 lines
7.9 KiB
C#
using System.Net.Sockets;
|
|
using Phantom.Utils.Logging;
|
|
using Phantom.Utils.Rpc.Frame;
|
|
using Phantom.Utils.Rpc.Message;
|
|
using Serilog;
|
|
|
|
namespace Phantom.Utils.Rpc.Runtime.Client;
|
|
|
|
public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> : IRpcConnectionProvider, IDisposable {
|
|
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(
|
|
string loggerName,
|
|
RpcClientConnectionParameters connectionParameters,
|
|
MessageRegistries<TClientToServerMessage, TServerToClientMessage> messageRegistries,
|
|
CancellationToken cancellationToken
|
|
) {
|
|
var connector = new RpcClientToServerConnector<TClientToServerMessage, TServerToClientMessage>(loggerName, connectionParameters, messageRegistries);
|
|
var connection = await connector.ConnectWithRetries(maxAttempts: 10, cancellationToken);
|
|
return connection == null ? null : new RpcClient<TClientToServerMessage, TServerToClientMessage>(loggerName, connectionParameters, connector, connection);
|
|
}
|
|
|
|
private readonly string loggerName;
|
|
private readonly ILogger logger;
|
|
|
|
private readonly RpcCommonConnectionParameters connectionParameters;
|
|
private readonly RpcClientToServerConnector<TClientToServerMessage, TServerToClientMessage> connector;
|
|
private readonly IRpcFrameSenderProvider<TClientToServerMessage>.Mutable frameSenderProvider = new ();
|
|
|
|
private RpcClientToServerConnector<TClientToServerMessage, TServerToClientMessage>.Connection currentConnection;
|
|
private readonly SemaphoreSlim currentConnectionSemaphore = new (1);
|
|
|
|
private Task? listenerTask;
|
|
|
|
private readonly CancellationTokenSource shutdownCancellationTokenSource = new ();
|
|
|
|
public MessageSender<TClientToServerMessage> MessageSender { get; }
|
|
|
|
private RpcClient(
|
|
string loggerName,
|
|
RpcCommonConnectionParameters connectionParameters,
|
|
RpcClientToServerConnector<TClientToServerMessage, TServerToClientMessage> connector,
|
|
RpcClientToServerConnector<TClientToServerMessage, TServerToClientMessage>.Connection connection
|
|
) {
|
|
this.loggerName = loggerName;
|
|
this.logger = PhantomLogger.Create<RpcClient<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
|
|
|
this.connectionParameters = connectionParameters;
|
|
this.connector = connector;
|
|
this.currentConnection = connection;
|
|
|
|
this.MessageSender = new MessageSender<TClientToServerMessage>(loggerName, connectionParameters, frameSenderProvider);
|
|
}
|
|
|
|
async Task<RpcStream> IRpcConnectionProvider.GetStream(CancellationToken cancellationToken) {
|
|
return (await GetConnection(cancellationToken)).Stream;
|
|
}
|
|
|
|
private async Task<RpcClientToServerConnector<TClientToServerMessage, TServerToClientMessage>.Connection> GetConnection(CancellationToken cancellationToken) {
|
|
await currentConnectionSemaphore.WaitAsync(cancellationToken);
|
|
try {
|
|
if (!currentConnection.Socket.Connected) {
|
|
currentConnection = await connector.ConnectWithRetries(cancellationToken);
|
|
}
|
|
|
|
return currentConnection;
|
|
} finally {
|
|
currentConnectionSemaphore.Release();
|
|
}
|
|
}
|
|
|
|
public void StartListening(IMessageReceiver<TServerToClientMessage> messageReceiver) {
|
|
if (listenerTask != null) {
|
|
throw new InvalidOperationException("Only one listener is allowed.");
|
|
}
|
|
|
|
listenerTask = Listen(messageReceiver);
|
|
}
|
|
|
|
private async Task Listen(IMessageReceiver<TServerToClientMessage> messageReceiver) {
|
|
CancellationToken cancellationToken = shutdownCancellationTokenSource.Token;
|
|
|
|
RpcClientToServerConnector<TClientToServerMessage, TServerToClientMessage>.Connection? connection = null;
|
|
SessionState? sessionState = null;
|
|
|
|
try {
|
|
while (true) {
|
|
if (connection != null) {
|
|
try {
|
|
await connection.Shutdown();
|
|
} catch (Exception e) {
|
|
logger.Error(e, "Caught exception closing the socket.");
|
|
}
|
|
|
|
await connection.DisposeAsync();
|
|
connection = null;
|
|
}
|
|
|
|
try {
|
|
connection = await GetConnection(cancellationToken);
|
|
} catch (OperationCanceledException) {
|
|
throw;
|
|
} catch (Exception e) {
|
|
logger.Warning(e, "Could not obtain a new connection.");
|
|
continue;
|
|
}
|
|
|
|
if (!sessionState.HasValue) {
|
|
sessionState = NewSessionState(connection, messageReceiver);
|
|
}
|
|
else if (connection.IsNewSession) {
|
|
await sessionState.Value.TryShutdownNow(logger);
|
|
sessionState = NewSessionState(connection, messageReceiver);
|
|
}
|
|
else {
|
|
sessionState.Value.Update(logger, connection);
|
|
}
|
|
|
|
try {
|
|
await IFrame.ReadFrom(connection.Stream, sessionState.Value.FrameReader, cancellationToken);
|
|
} catch (OperationCanceledException) {
|
|
throw;
|
|
} catch (EndOfStreamException) {
|
|
logger.Warning("Socket was closed.");
|
|
continue;
|
|
} catch (SocketException e) {
|
|
logger.Warning("Socket reading was interrupted. Socket error {ErrorCode} ({ErrorCodeName}), reason: {ErrorMessage}", e.ErrorCode, e.SocketErrorCode, e.Message);
|
|
continue;
|
|
} catch (Exception e) {
|
|
logger.Error(e, "Socket reading was interrupted.");
|
|
continue;
|
|
}
|
|
|
|
logger.Information("Server closed session.");
|
|
}
|
|
} finally {
|
|
if (sessionState.HasValue) {
|
|
await sessionState.Value.TryShutdown(logger, sendSessionTermination: cancellationToken.IsCancellationRequested);
|
|
}
|
|
|
|
if (connection != null) {
|
|
try {
|
|
await connection.Disconnect();
|
|
} finally {
|
|
await connection.DisposeAsync();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private SessionState NewSessionState(RpcClientToServerConnector<TClientToServerMessage, TServerToClientMessage>.Connection connection, IMessageReceiver<TServerToClientMessage> messageReceiver) {
|
|
var frameSender = new RpcFrameSender<TClientToServerMessage>(loggerName, connectionParameters, this, connection.MessageTypeMappings.ToServer, connection.PingInterval);
|
|
var messageHandler = new MessageHandler<TServerToClientMessage>(messageReceiver, frameSender);
|
|
var frameReader = new RpcFrameReader<TClientToServerMessage, TServerToClientMessage>(loggerName, connectionParameters, connection.MessageTypeMappings.ToClient, messageHandler, MessageSender, frameSender);
|
|
|
|
frameSenderProvider.SetNewValue(frameSender);
|
|
messageReceiver.OnSessionRestarted();
|
|
|
|
return new SessionState(frameSender, frameReader);
|
|
}
|
|
|
|
private readonly record struct SessionState(RpcFrameSender<TClientToServerMessage> FrameSender, RpcFrameReader<TClientToServerMessage, TServerToClientMessage> FrameReader) {
|
|
public void Update(ILogger logger, RpcClientToServerConnector<TClientToServerMessage, TServerToClientMessage>.Connection connection) {
|
|
TimeSpan currentPingInterval = FrameSender.PingInterval;
|
|
if (currentPingInterval != connection.PingInterval) {
|
|
logger.Warning("Server requested a different ping interval ({ServerPingInterval}s) than currently set ({ClientPingInterval}s), but ping interval cannot be updated for existing sessions.", connection.PingInterval.TotalSeconds, currentPingInterval.TotalSeconds);
|
|
}
|
|
}
|
|
|
|
public async Task TryShutdown(ILogger logger, bool sendSessionTermination) {
|
|
try {
|
|
await FrameSender.Shutdown(sendSessionTermination);
|
|
} catch (Exception e) {
|
|
logger.Error(e, "Caught exception while shutting down frame sender.");
|
|
}
|
|
}
|
|
|
|
public async Task TryShutdownNow(ILogger logger) {
|
|
try {
|
|
await FrameSender.ShutdownNow();
|
|
} catch (Exception e) {
|
|
logger.Error(e, "Caught exception while immediately shutting down frame sender.");
|
|
}
|
|
}
|
|
}
|
|
|
|
public async Task Shutdown() {
|
|
logger.Information("Shutting down client...");
|
|
|
|
try {
|
|
await MessageSender.Close();
|
|
} catch (Exception e) {
|
|
logger.Error(e, "Caught exception while closing message sender.");
|
|
}
|
|
|
|
await shutdownCancellationTokenSource.CancelAsync();
|
|
|
|
if (listenerTask != null) {
|
|
try {
|
|
await listenerTask;
|
|
} catch (OperationCanceledException) {
|
|
// Ignore.
|
|
} catch (Exception e) {
|
|
logger.Error(e, "Caught exception in listener.");
|
|
} finally {
|
|
listenerTask = null;
|
|
}
|
|
}
|
|
|
|
logger.Information("Client shut down.");
|
|
}
|
|
|
|
public void Dispose() {
|
|
currentConnectionSemaphore.Dispose();
|
|
shutdownCancellationTokenSource.Dispose();
|
|
}
|
|
}
|