mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-09-17 21:24:49 +02:00
Compare commits
22 Commits
wip-rpc
...
61e6a3b54c
Author | SHA1 | Date | |
---|---|---|---|
61e6a3b54c
|
|||
f4972a16cd
|
|||
7ec9f3db91
|
|||
61e847c366
|
|||
1f6cc0532a
|
|||
cf609a02f8
|
|||
23f9e079c1
|
|||
8f8e0cf2e0
|
|||
a65f22f97d
|
|||
9a4ac4604e
|
|||
a8fe48ec77
|
|||
62d10f248d
|
|||
3f8e46ae2d
|
|||
584e8acfd0
|
|||
47e563f7ce
|
|||
29c403f2d4
|
|||
2947fa3522
|
|||
4d1a79307f
|
|||
cc4eb8aa9c
|
|||
de1767c876
|
|||
39f2fa4b17
|
|||
19e0d6fd3d
|
@@ -19,8 +19,6 @@ public sealed class AgentServices {
|
||||
|
||||
public ActorSystem ActorSystem { get; }
|
||||
|
||||
private ControllerConnection ControllerConnection { get; }
|
||||
|
||||
private AgentInfo AgentInfo { get; }
|
||||
private AgentFolders AgentFolders { get; }
|
||||
private AgentState AgentState { get; }
|
||||
@@ -33,8 +31,6 @@ public sealed class AgentServices {
|
||||
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration, ControllerConnection controllerConnection) {
|
||||
this.ActorSystem = ActorSystemFactory.Create("Agent");
|
||||
|
||||
this.ControllerConnection = controllerConnection;
|
||||
|
||||
this.AgentInfo = agentInfo;
|
||||
this.AgentFolders = agentFolders;
|
||||
this.AgentState = new AgentState();
|
||||
@@ -53,14 +49,14 @@ public sealed class AgentServices {
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> Register(CancellationToken cancellationToken) {
|
||||
public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) {
|
||||
Logger.Information("Registering with the controller...");
|
||||
|
||||
// TODO NEED TO SEND WHEN SERVER RESTARTS!!!
|
||||
|
||||
ImmutableArray<ConfigureInstanceMessage> configureInstanceMessages;
|
||||
try {
|
||||
configureInstanceMessages = await ControllerConnection.Send<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
|
||||
configureInstanceMessages = await connection.Send<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
|
||||
} catch (Exception e) {
|
||||
Logger.Fatal(e, "Registration failed.");
|
||||
return false;
|
||||
@@ -82,7 +78,7 @@ public sealed class AgentServices {
|
||||
}
|
||||
}
|
||||
|
||||
await ControllerConnection.Send(new AdvertiseJavaRuntimesMessage(JavaRuntimeRepository.All), cancellationToken);
|
||||
await connection.Send(new AdvertiseJavaRuntimesMessage(JavaRuntimeRepository.All), cancellationToken);
|
||||
InstanceTicketManager.RefreshAgentStatus();
|
||||
|
||||
return true;
|
||||
|
@@ -62,12 +62,14 @@ try {
|
||||
return 1;
|
||||
}
|
||||
|
||||
var controllerConnection = new ControllerConnection(rpcClient.SendChannel);
|
||||
|
||||
Task? rpcClientListener = null;
|
||||
try {
|
||||
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
|
||||
|
||||
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
|
||||
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcClient.SendChannel));
|
||||
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), controllerConnection);
|
||||
await agentServices.Initialize();
|
||||
|
||||
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(agentServices);
|
||||
@@ -75,21 +77,18 @@ try {
|
||||
|
||||
rpcClientListener = rpcClient.Listen(new IMessageReceiver<IMessageToAgent>.Actor(rpcMessageHandlerActor));
|
||||
|
||||
if (await agentServices.Register(shutdownCancellationToken)) {
|
||||
if (await agentServices.Register(controllerConnection, shutdownCancellationToken)) {
|
||||
PhantomLogger.Root.Information("Phantom Panel agent is ready.");
|
||||
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
|
||||
}
|
||||
|
||||
await agentServices.Shutdown();
|
||||
} finally {
|
||||
PhantomLogger.Root.Information("Unregistering agent...");
|
||||
try {
|
||||
using var unregisterCancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
await rpcClient.SendChannel.SendMessage(new UnregisterAgentMessage(), unregisterCancellationTokenSource.Token);
|
||||
} catch (OperationCanceledException) {
|
||||
PhantomLogger.Root.Warning("Could not unregister agent after shutdown.");
|
||||
await controllerConnection.Send(new UnregisterAgentMessage(), CancellationToken.None);
|
||||
// TODO wait for acknowledgment
|
||||
} catch (Exception e) {
|
||||
PhantomLogger.Root.Warning(e, "Could not unregister agent during shutdown.");
|
||||
PhantomLogger.Root.Warning(e, "Could not unregister agent after shutdown.");
|
||||
} finally {
|
||||
await rpcClient.Shutdown();
|
||||
|
||||
|
@@ -14,7 +14,7 @@ sealed class AgentConnection(Guid agentGuid, string agentName) {
|
||||
|
||||
public void UpdateConnection(RpcServerToClientConnection<IMessageToController, IMessageToAgent> newConnection, string newAgentName) {
|
||||
lock (this) {
|
||||
connection?.ClientClosedSession();
|
||||
connection?.CloseSession();
|
||||
connection = newConnection;
|
||||
agentName = newAgentName;
|
||||
}
|
||||
@@ -23,7 +23,7 @@ sealed class AgentConnection(Guid agentGuid, string agentName) {
|
||||
public bool CloseIfSame(RpcServerToClientConnection<IMessageToController, IMessageToAgent> expected) {
|
||||
lock (this) {
|
||||
if (connection != null && ReferenceEquals(connection, expected)) {
|
||||
connection.ClientClosedSession();
|
||||
connection.CloseSession();
|
||||
connection = null;
|
||||
return true;
|
||||
}
|
||||
|
@@ -1,5 +1,4 @@
|
||||
using System.Collections.Immutable;
|
||||
using Akka.Actor;
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
@@ -32,7 +31,7 @@ sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
|
||||
this.eventLogManager = init.EventLogManager;
|
||||
|
||||
ReceiveAsyncAndReply<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(HandleRegisterAgent);
|
||||
ReceiveAsync<UnregisterAgentMessage>(HandleUnregisterAgent);
|
||||
Receive<UnregisterAgentMessage>(HandleUnregisterAgent);
|
||||
Receive<AdvertiseJavaRuntimesMessage>(HandleAdvertiseJavaRuntimes);
|
||||
Receive<ReportAgentStatusMessage>(HandleReportAgentStatus);
|
||||
Receive<ReportInstanceStatusMessage>(HandleReportInstanceStatus);
|
||||
@@ -50,13 +49,11 @@ sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
|
||||
return agentManager.RegisterAgent(message.AgentInfo, connection);
|
||||
}
|
||||
|
||||
private Task HandleUnregisterAgent(UnregisterAgentMessage message) {
|
||||
private void HandleUnregisterAgent(UnregisterAgentMessage message) {
|
||||
Guid agentGuid = RequireAgentGuid();
|
||||
agentManager.TellAgent(agentGuid, new AgentActor.UnregisterCommand(connection));
|
||||
agentManager.OnSessionClosed(connection.SessionId, agentGuid);
|
||||
|
||||
Self.Tell(PoisonPill.Instance);
|
||||
return connection.ClientClosedSession();
|
||||
connection.CloseSession();
|
||||
}
|
||||
|
||||
private void HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
|
||||
|
@@ -27,7 +27,7 @@ sealed class WebClientRegistrar(
|
||||
) : IRpcServerClientRegistrar<IMessageToController, IMessageToWeb> {
|
||||
public IMessageReceiver<IMessageToController> Register(RpcServerToClientConnection<IMessageToController, IMessageToWeb> connection) {
|
||||
var name = "WebClient-" + connection.SessionId;
|
||||
var init = new WebMessageHandlerActor.Init(connection, controllerState, instanceLogManager, userManager, roleManager, userRoleManager, userLoginManager, auditLogManager, agentManager, minecraftVersions, eventLogManager);
|
||||
var init = new WebMessageHandlerActor.Init(connection, this, controllerState, instanceLogManager, userManager, roleManager, userRoleManager, userLoginManager, auditLogManager, agentManager, minecraftVersions, eventLogManager);
|
||||
return new IMessageReceiver<IMessageToController>.Actor(actorSystem.ActorOf(WebMessageHandlerActor.Factory(init), name));
|
||||
}
|
||||
}
|
||||
|
@@ -1,5 +1,4 @@
|
||||
using System.Collections.Immutable;
|
||||
using Akka.Actor;
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Java;
|
||||
using Phantom.Common.Data.Minecraft;
|
||||
@@ -24,6 +23,7 @@ namespace Phantom.Controller.Services.Rpc;
|
||||
sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
||||
public readonly record struct Init(
|
||||
RpcServerToClientConnection<IMessageToController, IMessageToWeb> Connection,
|
||||
WebClientRegistrar WebClientRegistrar,
|
||||
ControllerState ControllerState,
|
||||
InstanceLogManager InstanceLogManager,
|
||||
UserManager UserManager,
|
||||
@@ -88,8 +88,7 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
||||
}
|
||||
|
||||
private Task HandleUnregisterWeb(UnregisterWebMessage message) {
|
||||
Self.Tell(PoisonPill.Instance);
|
||||
return connection.ClientClosedSession();
|
||||
return connection.CloseSession();
|
||||
}
|
||||
|
||||
private Task<Optional<LogInSuccess>> HandleLogIn(LogInMessage message) {
|
||||
|
@@ -24,7 +24,7 @@ interface IFrame {
|
||||
switch (oneByteBuffer[0]) {
|
||||
case TypePingId:
|
||||
var pingTime = await PingFrame.Read(stream, cancellationToken);
|
||||
await reader.OnPingFrame(pingTime, cancellationToken);
|
||||
await reader.OnPing(pingTime, cancellationToken);
|
||||
break;
|
||||
|
||||
case TypePongId:
|
||||
@@ -48,7 +48,7 @@ interface IFrame {
|
||||
break;
|
||||
|
||||
default:
|
||||
reader.OnUnknownFrameId(oneByteBuffer[0]);
|
||||
reader.OnUnknownFrameStart(oneByteBuffer[0]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@@ -3,10 +3,10 @@
|
||||
namespace Phantom.Utils.Rpc.Frame;
|
||||
|
||||
interface IFrameReader {
|
||||
ValueTask OnPingFrame(DateTimeOffset pingTime, CancellationToken cancellationToken);
|
||||
ValueTask OnPing(DateTimeOffset pingTime, CancellationToken cancellationToken);
|
||||
void OnPongFrame(PongFrame frame);
|
||||
Task OnMessageFrame(MessageFrame frame, CancellationToken cancellationToken);
|
||||
void OnReplyFrame(ReplyFrame frame);
|
||||
void OnErrorFrame(ErrorFrame frame);
|
||||
void OnUnknownFrameId(byte frameId);
|
||||
void OnUnknownFrameStart(byte id);
|
||||
}
|
||||
|
@@ -1,13 +0,0 @@
|
||||
using Phantom.Utils.Collections;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
sealed class MessageReceiveTracker {
|
||||
private readonly RangeSet<uint> receivedMessageIds = new ();
|
||||
|
||||
public bool ReceiveMessage(uint messageId) {
|
||||
lock (receivedMessageIds) {
|
||||
return receivedMessageIds.Add(messageId);
|
||||
}
|
||||
}
|
||||
}
|
@@ -13,17 +13,15 @@ public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> :
|
||||
|
||||
private readonly string loggerName;
|
||||
private readonly ILogger logger;
|
||||
private readonly MessageRegistry<TServerToClientMessage> messageRegistry;
|
||||
private readonly MessageReceiveTracker messageReceiveTracker = new ();
|
||||
private readonly MessageRegistry<TServerToClientMessage> serverToClientMessageRegistry;
|
||||
private readonly RpcClientToServerConnection connection;
|
||||
private readonly CancellationTokenSource shutdownCancellationTokenSource = new ();
|
||||
|
||||
public RpcSendChannel<TClientToServerMessage> SendChannel { get; }
|
||||
|
||||
private RpcClient(string loggerName, RpcClientConnectionParameters connectionParameters, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, RpcClientToServerConnector connector, RpcClientToServerConnector.Connection connection) {
|
||||
this.loggerName = loggerName;
|
||||
this.logger = PhantomLogger.Create<RpcClient<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
||||
this.messageRegistry = messageDefinitions.ToClient;
|
||||
this.serverToClientMessageRegistry = messageDefinitions.ToClient;
|
||||
|
||||
this.connection = new RpcClientToServerConnection(loggerName, connector, connection);
|
||||
this.SendChannel = new RpcSendChannel<TClientToServerMessage>(loggerName, connectionParameters.Common, this.connection, messageDefinitions.ToServer);
|
||||
@@ -31,9 +29,9 @@ public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> :
|
||||
|
||||
public async Task Listen(IMessageReceiver<TServerToClientMessage> receiver) {
|
||||
var messageHandler = new RpcMessageHandler<TServerToClientMessage>(receiver, SendChannel);
|
||||
var frameReader = new RpcFrameReader<TClientToServerMessage, TServerToClientMessage>(loggerName, messageRegistry, messageReceiveTracker, messageHandler, SendChannel);
|
||||
var frameReader = new RpcFrameReader<TClientToServerMessage, TServerToClientMessage>(loggerName, serverToClientMessageRegistry, messageHandler, SendChannel);
|
||||
try {
|
||||
await connection.ReadConnection(frameReader, shutdownCancellationTokenSource.Token);
|
||||
await connection.ReadConnection(frameReader, CancellationToken.None);
|
||||
} catch (OperationCanceledException) {
|
||||
// Ignore.
|
||||
}
|
||||
@@ -49,13 +47,11 @@ public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> :
|
||||
}
|
||||
|
||||
try {
|
||||
connection.StopReconnecting();
|
||||
connection.Close();
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Caught exception while closing connection.");
|
||||
}
|
||||
|
||||
await shutdownCancellationTokenSource.CancelAsync();
|
||||
|
||||
logger.Information("Client shut down.");
|
||||
}
|
||||
|
||||
|
@@ -1,5 +1,4 @@
|
||||
using System.Net.Sockets;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Frame;
|
||||
using Serilog;
|
||||
|
||||
@@ -13,7 +12,7 @@ sealed class RpcClientToServerConnection(string loggerName, RpcClientToServerCon
|
||||
|
||||
private readonly CancellationTokenSource newConnectionCancellationTokenSource = new ();
|
||||
|
||||
public async Task<Stream> GetStream(CancellationToken cancellationToken) {
|
||||
public async Task<Stream> GetStream() {
|
||||
return (await GetConnection()).Stream;
|
||||
}
|
||||
|
||||
@@ -55,22 +54,20 @@ sealed class RpcClientToServerConnection(string loggerName, RpcClientToServerCon
|
||||
throw;
|
||||
} catch (EndOfStreamException) {
|
||||
logger.Warning("Socket was closed.");
|
||||
} catch (SocketException e) {
|
||||
logger.Error("Socket reading was interrupted. Socket error {ErrorCode} ({ErrorCodeName}), reason: {ErrorMessage}", e.ErrorCode, e.SocketErrorCode, e.Message);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Socket reading was interrupted.");
|
||||
}
|
||||
|
||||
try {
|
||||
await connection.Shutdown();
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Caught exception closing the socket.");
|
||||
logger.Error(e, "Closing socket due to an exception while reading it.");
|
||||
|
||||
try {
|
||||
await connection.Shutdown();
|
||||
} catch (Exception e2) {
|
||||
logger.Error(e2, "Caught exception closing the socket.");
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
try {
|
||||
await connection.Disconnect();
|
||||
await connection.Disconnect(); // TODO what happens if already disconnected?
|
||||
} finally {
|
||||
connection.Dispose();
|
||||
}
|
||||
@@ -78,7 +75,7 @@ sealed class RpcClientToServerConnection(string loggerName, RpcClientToServerCon
|
||||
}
|
||||
}
|
||||
|
||||
public void StopReconnecting() {
|
||||
public void Close() {
|
||||
newConnectionCancellationTokenSource.Cancel();
|
||||
}
|
||||
|
||||
|
@@ -53,8 +53,7 @@ internal sealed class RpcClientToServerConnector {
|
||||
return newConnection;
|
||||
}
|
||||
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
logger.Information("Trying to reconnect in {Seconds}s.", nextAttemptDelay.TotalSeconds.ToString("F1"));
|
||||
logger.Warning("Failed to connect to server, trying again in {}.", nextAttemptDelay.TotalSeconds.ToString("F1"));
|
||||
|
||||
await Task.Delay(nextAttemptDelay, cancellationToken);
|
||||
nextAttemptDelay = Comparables.Min(nextAttemptDelay.Multiply(1.5), MaximumRetryDelay);
|
||||
@@ -67,12 +66,9 @@ internal sealed class RpcClientToServerConnector {
|
||||
Socket clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
|
||||
try {
|
||||
await clientSocket.ConnectAsync(parameters.Host, parameters.Port, cancellationToken);
|
||||
} catch (SocketException e) {
|
||||
logger.Error("Could not connect. Socket error {ErrorCode} ({ErrorCodeName}), reason: {ErrorMessage}", e.ErrorCode, e.SocketErrorCode, e.Message);
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Could not connect.");
|
||||
return null;
|
||||
throw;
|
||||
}
|
||||
|
||||
SslStream? stream;
|
||||
|
@@ -1,5 +1,5 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
interface IRpcConnectionProvider {
|
||||
Task<Stream> GetStream(CancellationToken cancellationToken);
|
||||
Task<Stream> GetStream();
|
||||
}
|
||||
|
@@ -3,8 +3,7 @@
|
||||
public enum RpcError : byte {
|
||||
InvalidData = 0,
|
||||
UnknownMessageRegistryCode = 1,
|
||||
MessageTooLarge = 2,
|
||||
MessageDeserializationError = 3,
|
||||
MessageHandlingError = 4,
|
||||
MessageAlreadyHandled = 5,
|
||||
MessageDeserializationError = 2,
|
||||
MessageHandlingError = 3,
|
||||
MessageTooLarge = 4,
|
||||
}
|
||||
|
@@ -1,14 +1,13 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
sealed class RpcErrorException : Exception {
|
||||
public sealed class RpcErrorException : Exception {
|
||||
internal static RpcErrorException From(RpcError error) {
|
||||
return error switch {
|
||||
RpcError.InvalidData => new RpcErrorException("Invalid data", error),
|
||||
RpcError.UnknownMessageRegistryCode => new RpcErrorException("Unknown message registry code", error),
|
||||
RpcError.MessageTooLarge => new RpcErrorException("Message is too large", error),
|
||||
RpcError.MessageDeserializationError => new RpcErrorException("Message deserialization error", error),
|
||||
RpcError.MessageHandlingError => new RpcErrorException("Message handling error", error),
|
||||
RpcError.MessageAlreadyHandled => new RpcErrorException("Message already handled", error),
|
||||
RpcError.MessageTooLarge => new RpcErrorException("Message is too large", error),
|
||||
_ => new RpcErrorException("Unknown error", error),
|
||||
};
|
||||
}
|
||||
|
@@ -9,13 +9,12 @@ namespace Phantom.Utils.Rpc.Runtime;
|
||||
sealed class RpcFrameReader<TSentMessage, TReceivedMessage>(
|
||||
string loggerName,
|
||||
MessageRegistry<TReceivedMessage> messageRegistry,
|
||||
MessageReceiveTracker messageReceiveTracker,
|
||||
RpcMessageHandler<TReceivedMessage> messageHandler,
|
||||
RpcSendChannel<TSentMessage> sendChannel
|
||||
) : IFrameReader {
|
||||
private readonly ILogger logger = PhantomLogger.Create<RpcFrameReader<TSentMessage, TReceivedMessage>>(loggerName);
|
||||
|
||||
public ValueTask OnPingFrame(DateTimeOffset pingTime, CancellationToken cancellationToken) {
|
||||
public ValueTask OnPing(DateTimeOffset pingTime, CancellationToken cancellationToken) {
|
||||
messageHandler.OnPing();
|
||||
return sendChannel.SendPong(pingTime, cancellationToken);
|
||||
}
|
||||
@@ -25,11 +24,6 @@ sealed class RpcFrameReader<TSentMessage, TReceivedMessage>(
|
||||
}
|
||||
|
||||
public Task OnMessageFrame(MessageFrame frame, CancellationToken cancellationToken) {
|
||||
if (!messageReceiveTracker.ReceiveMessage(frame.MessageId)) {
|
||||
logger.Warning("Received duplicate message {MessageId}.", frame.MessageId);
|
||||
return messageHandler.SendError(frame.MessageId, RpcError.MessageAlreadyHandled, cancellationToken).AsTask();
|
||||
}
|
||||
|
||||
if (messageRegistry.TryGetType(frame, out var messageType)) {
|
||||
logger.Verbose("Received message {MesageId} of type {MessageType} ({Bytes} B).", frame.MessageId, messageType.Name, frame.SerializedMessage.Length);
|
||||
}
|
||||
@@ -47,7 +41,7 @@ sealed class RpcFrameReader<TSentMessage, TReceivedMessage>(
|
||||
sendChannel.ReceiveError(frame.ReplyingToMessageId, frame.Error);
|
||||
}
|
||||
|
||||
public void OnUnknownFrameId(byte frameId) {
|
||||
logger.Error("Received unknown frame ID: {FrameId}", frameId);
|
||||
public void OnUnknownFrameStart(byte id) {
|
||||
logger.Error("Received unknown frame ID: {FrameId}", id);
|
||||
}
|
||||
}
|
||||
|
@@ -19,7 +19,6 @@ public sealed class RpcSendChannel<TMessageBase> : IRpcReplySender, IDisposable
|
||||
private readonly Task sendQueueTask;
|
||||
private readonly Task pingTask;
|
||||
|
||||
private readonly CancellationTokenSource sendQueueCancellationTokenSource = new ();
|
||||
private readonly CancellationTokenSource pingCancellationTokenSource = new ();
|
||||
|
||||
private uint nextMessageId;
|
||||
@@ -89,15 +88,13 @@ public sealed class RpcSendChannel<TMessageBase> : IRpcReplySender, IDisposable
|
||||
}
|
||||
|
||||
private async Task ProcessSendQueue() {
|
||||
CancellationToken cancellationToken = sendQueueCancellationTokenSource.Token;
|
||||
|
||||
await foreach (IFrame frame in sendQueue.Reader.ReadAllAsync(cancellationToken)) {
|
||||
await foreach (IFrame frame in sendQueue.Reader.ReadAllAsync()) {
|
||||
while (true) {
|
||||
try {
|
||||
Stream stream = await connectionProvider.GetStream(cancellationToken);
|
||||
await stream.WriteAsync(frame.FrameType, cancellationToken);
|
||||
await frame.Write(stream, cancellationToken);
|
||||
await stream.FlushAsync(cancellationToken);
|
||||
Stream stream = await connectionProvider.GetStream();
|
||||
await stream.WriteAsync(frame.FrameType);
|
||||
await frame.Write(stream);
|
||||
await stream.FlushAsync();
|
||||
break;
|
||||
} catch (OperationCanceledException) {
|
||||
throw;
|
||||
@@ -118,16 +115,15 @@ public sealed class RpcSendChannel<TMessageBase> : IRpcReplySender, IDisposable
|
||||
pongTask = new TaskCompletionSource<DateTimeOffset>();
|
||||
|
||||
if (!sendQueue.Writer.TryWrite(PingFrame.Instance)) {
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
logger.Warning("Skipped a ping due to a full queue.");
|
||||
continue;
|
||||
}
|
||||
|
||||
DateTimeOffset pingTime = await pongTask.Task.WaitAsync(cancellationToken);
|
||||
DateTimeOffset pingTime = await pongTask.Task;
|
||||
DateTimeOffset currentTime = DateTimeOffset.UtcNow;
|
||||
|
||||
TimeSpan roundTripTime = currentTime - pingTime;
|
||||
logger.Information("Received pong, round trip time: {RoundTripTime} ms", (long) roundTripTime.TotalMilliseconds);
|
||||
logger.Information("Received pong (rtt: {RoundTripTime} ms).", (long) roundTripTime.TotalMilliseconds);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,16 +144,7 @@ public sealed class RpcSendChannel<TMessageBase> : IRpcReplySender, IDisposable
|
||||
sendQueue.Writer.TryComplete();
|
||||
|
||||
try {
|
||||
await pingTask;
|
||||
} catch (Exception) {
|
||||
// Ignore.
|
||||
}
|
||||
|
||||
try {
|
||||
await sendQueueTask.WaitAsync(TimeSpan.FromSeconds(15));
|
||||
} catch (TimeoutException) {
|
||||
logger.Warning("Could not finish processing send queue before timeout, forcibly shutting it down.");
|
||||
await sendQueueCancellationTokenSource.CancelAsync();
|
||||
await Task.WhenAll(sendQueueTask, pingTask);
|
||||
} catch (Exception) {
|
||||
// Ignore.
|
||||
}
|
||||
@@ -165,7 +152,6 @@ public sealed class RpcSendChannel<TMessageBase> : IRpcReplySender, IDisposable
|
||||
|
||||
public void Dispose() {
|
||||
sendQueueTask.Dispose();
|
||||
sendQueueCancellationTokenSource.Dispose();
|
||||
pingCancellationTokenSource.Dispose();
|
||||
}
|
||||
}
|
||||
|
@@ -17,7 +17,7 @@ public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
||||
IRpcServerClientRegistrar<TClientToServerMessage, TServerToClientMessage> clientRegistrar
|
||||
) {
|
||||
private readonly ILogger logger = PhantomLogger.Create<RpcServer<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
||||
private readonly RpcServerClientSessions<TServerToClientMessage> clientSessions = new (connectionParameters.Common, messageDefinitions.ToClient);
|
||||
private readonly RpcServerClientSessions<TServerToClientMessage> clientSessions = new (loggerName, connectionParameters.Common, messageDefinitions.ToClient);
|
||||
private readonly List<Client> clients = [];
|
||||
|
||||
public async Task<bool> Run(CancellationToken shutdownToken) {
|
||||
@@ -102,7 +102,7 @@ public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
||||
|
||||
private string Address => socket.RemoteEndPoint?.ToString() ?? "<unknown address>";
|
||||
|
||||
private ILogger logger;
|
||||
private readonly ILogger logger;
|
||||
private readonly string serverLoggerName;
|
||||
|
||||
private readonly IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions;
|
||||
@@ -151,10 +151,9 @@ public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
||||
}
|
||||
|
||||
if (sessionIdResult.HasValue) {
|
||||
logger.Information("Client connected.");
|
||||
await RunConnectedSession(sessionIdResult.Value, stream);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Caught exception while processing client.");
|
||||
} finally {
|
||||
logger.Information("Disconnecting client...");
|
||||
try {
|
||||
@@ -221,31 +220,10 @@ public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
||||
|
||||
private async Task RunConnectedSession(Guid sessionId, Stream stream) {
|
||||
var loggerName = PhantomLogger.ConcatNames(serverLoggerName, clientSessions.NextLoggerName(sessionId));
|
||||
|
||||
logger.Information("Client connected with session {SessionId}, new logger name: {LoggerName}", sessionId, loggerName);
|
||||
logger = PhantomLogger.Create<RpcServer<TClientToServerMessage, TServerToClientMessage>, Client>(loggerName);
|
||||
|
||||
var session = clientSessions.OnConnected(sessionId, loggerName, stream);
|
||||
var session = clientSessions.OnConnected(sessionId, stream);
|
||||
try {
|
||||
var connection = new RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage>(loggerName, clientSessions, sessionId, messageDefinitions.ToServer, stream, session);
|
||||
|
||||
IMessageReceiver<TClientToServerMessage> messageReceiver;
|
||||
try {
|
||||
messageReceiver = clientRegistrar.Register(connection);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Could not register client.");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await connection.Listen(messageReceiver);
|
||||
} catch (EndOfStreamException) {
|
||||
logger.Warning("Socket reading was interrupted, connection lost.");
|
||||
} catch (SocketException e) {
|
||||
logger.Error("Socket reading was interrupted. Socket error {ErrorCode} ({ErrorCodeName}), reason: {ErrorMessage}", e.ErrorCode, e.SocketErrorCode, e.Message);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Socket reading was interrupted.");
|
||||
}
|
||||
await connection.Listen(clientRegistrar.Register(connection));
|
||||
} finally {
|
||||
clientSessions.OnDisconnected(sessionId);
|
||||
}
|
||||
|
@@ -4,13 +4,11 @@ namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||
|
||||
sealed class RpcServerClientSession<TServerToClientMessage> : IRpcConnectionProvider {
|
||||
public RpcSendChannel<TServerToClientMessage> SendChannel { get; }
|
||||
public MessageReceiveTracker MessageReceiveTracker { get; }
|
||||
|
||||
private TaskCompletionSource<Stream> nextStream = new ();
|
||||
|
||||
public RpcServerClientSession(string loggerName, RpcCommonConnectionParameters connectionParameters, MessageRegistry<TServerToClientMessage> messageRegistry) {
|
||||
this.SendChannel = new RpcSendChannel<TServerToClientMessage>(loggerName, connectionParameters, this, messageRegistry);
|
||||
this.MessageReceiveTracker = new MessageReceiveTracker();
|
||||
}
|
||||
|
||||
public void OnConnected(Stream stream) {
|
||||
@@ -31,7 +29,7 @@ sealed class RpcServerClientSession<TServerToClientMessage> : IRpcConnectionProv
|
||||
}
|
||||
}
|
||||
|
||||
Task<Stream> IRpcConnectionProvider.GetStream(CancellationToken cancellationToken) {
|
||||
Task<Stream> IRpcConnectionProvider.GetStream() {
|
||||
lock (this) {
|
||||
return nextStream.Task;
|
||||
}
|
||||
|
@@ -4,34 +4,23 @@ using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||
|
||||
sealed class RpcServerClientSessions<TServerToClientMessage> {
|
||||
private readonly RpcCommonConnectionParameters connectionParameters;
|
||||
private readonly MessageRegistry<TServerToClientMessage> messageRegistry;
|
||||
|
||||
sealed class RpcServerClientSessions<TServerToClientMessage>(string loggerName, RpcCommonConnectionParameters connectionParameters, MessageRegistry<TServerToClientMessage> messageRegistry) {
|
||||
private readonly ConcurrentDictionary<Guid, RpcServerClientSession<TServerToClientMessage>> sessionsById = new ();
|
||||
private readonly ConcurrentDictionary<Guid, uint> sessionLoggerSequenceIds = new ();
|
||||
|
||||
private readonly Func<Guid, string, RpcServerClientSession<TServerToClientMessage>> createSessionFunction;
|
||||
|
||||
public RpcServerClientSessions(RpcCommonConnectionParameters connectionParameters, MessageRegistry<TServerToClientMessage> messageRegistry) {
|
||||
this.connectionParameters = connectionParameters;
|
||||
this.messageRegistry = messageRegistry;
|
||||
this.createSessionFunction = CreateSession;
|
||||
}
|
||||
|
||||
public string NextLoggerName(Guid sessionId) {
|
||||
string name = PhantomLogger.ShortenGuid(sessionId);
|
||||
return name + "/" + sessionLoggerSequenceIds.AddOrUpdate(sessionId, static _ => 1, static (_, prev) => prev + 1);
|
||||
}
|
||||
|
||||
public RpcServerClientSession<TServerToClientMessage> OnConnected(Guid sessionId, string loggerName, Stream stream) {
|
||||
var session = sessionsById.GetOrAdd(sessionId, createSessionFunction, loggerName);
|
||||
public RpcSendChannel<TServerToClientMessage> OnConnected(Guid sessionId, Stream stream) {
|
||||
var session = sessionsById.GetOrAdd(sessionId, CreateSession);
|
||||
session.OnConnected(stream);
|
||||
return session;
|
||||
return session.SendChannel;
|
||||
}
|
||||
|
||||
private RpcServerClientSession<TServerToClientMessage> CreateSession(Guid sessionId, string loggerName) {
|
||||
return new RpcServerClientSession<TServerToClientMessage>(loggerName, connectionParameters, messageRegistry);
|
||||
private RpcServerClientSession<TServerToClientMessage> CreateSession(Guid sessionId) {
|
||||
return new RpcServerClientSession<TServerToClientMessage>(PhantomLogger.ConcatNames(loggerName, sessionId.ToString()), connectionParameters, messageRegistry);
|
||||
}
|
||||
|
||||
public void OnDisconnected(Guid sessionId) {
|
||||
|
@@ -1,49 +1,38 @@
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Frame;
|
||||
using Phantom.Utils.Rpc.Frame;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||
|
||||
public sealed class RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage> {
|
||||
private readonly string loggerName;
|
||||
private readonly ILogger logger;
|
||||
private readonly RpcServerClientSessions<TServerToClientMessage> sessions;
|
||||
private readonly MessageRegistry<TClientToServerMessage> messageRegistry;
|
||||
private readonly MessageReceiveTracker messageReceiveTracker;
|
||||
private readonly Stream stream;
|
||||
private readonly CancellationTokenSource closeCancellationTokenSource = new ();
|
||||
|
||||
public Guid SessionId { get; }
|
||||
public RpcSendChannel<TServerToClientMessage> SendChannel { get; }
|
||||
|
||||
internal RpcServerToClientConnection(string loggerName, RpcServerClientSessions<TServerToClientMessage> sessions, Guid sessionId, MessageRegistry<TClientToServerMessage> messageRegistry, Stream stream, RpcServerClientSession<TServerToClientMessage> session) {
|
||||
internal RpcServerToClientConnection(string loggerName, RpcServerClientSessions<TServerToClientMessage> sessions, Guid sessionId, MessageRegistry<TClientToServerMessage> messageRegistry, Stream stream, RpcSendChannel<TServerToClientMessage> sendChannel) {
|
||||
this.loggerName = loggerName;
|
||||
this.logger = PhantomLogger.Create<RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
||||
this.sessions = sessions;
|
||||
this.messageRegistry = messageRegistry;
|
||||
this.messageReceiveTracker = session.MessageReceiveTracker;
|
||||
this.stream = stream;
|
||||
|
||||
this.SessionId = sessionId;
|
||||
this.SendChannel = session.SendChannel;
|
||||
this.SendChannel = sendChannel;
|
||||
}
|
||||
|
||||
public Task CloseSession() {
|
||||
return sessions.CloseSession(SessionId);
|
||||
}
|
||||
|
||||
internal async Task Listen(IMessageReceiver<TClientToServerMessage> receiver) {
|
||||
var messageHandler = new RpcMessageHandler<TClientToServerMessage>(receiver, SendChannel);
|
||||
var frameReader = new RpcFrameReader<TServerToClientMessage, TClientToServerMessage>(loggerName, messageRegistry, messageReceiveTracker, messageHandler, SendChannel);
|
||||
var frameReader = new RpcFrameReader<TServerToClientMessage, TClientToServerMessage>(loggerName, messageRegistry, messageHandler, SendChannel);
|
||||
try {
|
||||
await IFrame.ReadFrom(stream, frameReader, closeCancellationTokenSource.Token);
|
||||
await IFrame.ReadFrom(stream, frameReader, CancellationToken.None);
|
||||
} catch (OperationCanceledException) {
|
||||
// Ignore.
|
||||
}
|
||||
}
|
||||
|
||||
public async Task ClientClosedSession() {
|
||||
logger.Information("Client closed session.");
|
||||
|
||||
Task closeSessionTask = sessions.CloseSession(SessionId);
|
||||
await closeCancellationTokenSource.CancelAsync();
|
||||
await closeSessionTask;
|
||||
}
|
||||
}
|
||||
|
@@ -1,93 +0,0 @@
|
||||
using NUnit.Framework;
|
||||
using Phantom.Utils.Collections;
|
||||
using Range = Phantom.Utils.Collections.RangeSet<int>.Range;
|
||||
|
||||
namespace Phantom.Utils.Tests.Collections;
|
||||
|
||||
[TestFixture]
|
||||
public class RangeSetTests {
|
||||
[Test]
|
||||
public void OneValue() {
|
||||
var set = new RangeSet<int>();
|
||||
set.Add(5);
|
||||
|
||||
Assert.That(set, Is.EqualTo(new[] {
|
||||
new Range(Min: 5, Max: 5),
|
||||
}));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void MultipleDisjointValues() {
|
||||
var set = new RangeSet<int>();
|
||||
set.Add(5);
|
||||
set.Add(7);
|
||||
set.Add(1);
|
||||
set.Add(3);
|
||||
|
||||
Assert.That(set, Is.EqualTo(new[] {
|
||||
new Range(Min: 1, Max: 1),
|
||||
new Range(Min: 3, Max: 3),
|
||||
new Range(Min: 5, Max: 5),
|
||||
new Range(Min: 7, Max: 7),
|
||||
}));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void ExtendMin() {
|
||||
var set = new RangeSet<int>();
|
||||
set.Add(5);
|
||||
set.Add(4);
|
||||
|
||||
Assert.That(set, Is.EqualTo(new[] {
|
||||
new Range(Min: 4, Max: 5),
|
||||
}));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void ExtendMax() {
|
||||
var set = new RangeSet<int>();
|
||||
set.Add(5);
|
||||
set.Add(6);
|
||||
|
||||
Assert.That(set, Is.EqualTo(new[] {
|
||||
new Range(Min: 5, Max: 6),
|
||||
}));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void ExtendMaxAndMerge() {
|
||||
var set = new RangeSet<int>();
|
||||
set.Add(5);
|
||||
set.Add(7);
|
||||
set.Add(6);
|
||||
|
||||
Assert.That(set, Is.EqualTo(new[] {
|
||||
new Range(Min: 5, Max: 7),
|
||||
}));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void MultipleMergingAndDisjointValues() {
|
||||
var set = new RangeSet<int>();
|
||||
set.Add(1);
|
||||
set.Add(2);
|
||||
set.Add(5);
|
||||
set.Add(4);
|
||||
set.Add(10);
|
||||
set.Add(7);
|
||||
set.Add(9);
|
||||
set.Add(11);
|
||||
set.Add(16);
|
||||
set.Add(12);
|
||||
set.Add(3);
|
||||
set.Add(14);
|
||||
|
||||
Assert.That(set, Is.EqualTo(new[] {
|
||||
new Range(Min: 1, Max: 5),
|
||||
new Range(Min: 7, Max: 7),
|
||||
new Range(Min: 9, Max: 12),
|
||||
new Range(Min: 14, Max: 14),
|
||||
new Range(Min: 16, Max: 16),
|
||||
}));
|
||||
}
|
||||
}
|
@@ -1,69 +0,0 @@
|
||||
using System.Collections;
|
||||
using System.Numerics;
|
||||
|
||||
namespace Phantom.Utils.Collections;
|
||||
|
||||
public sealed class RangeSet<T> : IEnumerable<RangeSet<T>.Range> where T : IBinaryInteger<T> {
|
||||
private readonly List<Range> ranges = [];
|
||||
|
||||
public bool Add(T value) {
|
||||
int index = 0;
|
||||
|
||||
for (; index < ranges.Count; index++) {
|
||||
var range = ranges[index];
|
||||
if (range.Contains(value)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (range.ExtendIfAtEdge(value, out var extendedRange)) {
|
||||
ranges[index] = extendedRange;
|
||||
|
||||
if (index < ranges.Count - 1) {
|
||||
var nextRange = ranges[index + 1];
|
||||
if (extendedRange.Max + T.One == nextRange.Min) {
|
||||
ranges[index] = new Range(extendedRange.Min, nextRange.Max);
|
||||
ranges.RemoveAt(index + 1);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (range.Max > value) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ranges.Insert(index, new Range(value, value));
|
||||
return true;
|
||||
}
|
||||
|
||||
public IEnumerator<Range> GetEnumerator() {
|
||||
return ranges.GetEnumerator();
|
||||
}
|
||||
|
||||
IEnumerator IEnumerable.GetEnumerator() {
|
||||
return GetEnumerator();
|
||||
}
|
||||
|
||||
public readonly record struct Range(T Min, T Max) {
|
||||
internal bool ExtendIfAtEdge(T value, out Range newRange) {
|
||||
if (value == Min - T.One) {
|
||||
newRange = this with { Min = value };
|
||||
return true;
|
||||
}
|
||||
else if (value == Max + T.One) {
|
||||
newRange = this with { Max = value };
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
newRange = default;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
internal bool Contains(T value) {
|
||||
return value >= Min && value <= Max;
|
||||
}
|
||||
}
|
||||
}
|
@@ -93,14 +93,11 @@ try {
|
||||
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
|
||||
await webApplication.StopAsync();
|
||||
} finally {
|
||||
PhantomLogger.Root.Information("Unregistering web...");
|
||||
try {
|
||||
using var unregisterCancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
await rpcClient.SendChannel.SendMessage(new UnregisterWebMessage(), unregisterCancellationTokenSource.Token);
|
||||
} catch (OperationCanceledException) {
|
||||
PhantomLogger.Root.Warning("Could not unregister web after shutdown.");
|
||||
await rpcClient.SendChannel.SendMessage(new UnregisterWebMessage());
|
||||
// TODO wait for acknowledgment
|
||||
} catch (Exception e) {
|
||||
PhantomLogger.Root.Warning(e, "Could not unregister web after shutdown.");
|
||||
PhantomLogger.Root.Warning(e, "Could not unregister agent after shutdown.");
|
||||
} finally {
|
||||
await rpcClient.Shutdown();
|
||||
|
||||
|
Reference in New Issue
Block a user