1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-09-17 12:24:49 +02:00

2 Commits

Author SHA1 Message Date
a8fe48ec77 Replace NetMQ with custom TCP logic 2025-09-12 01:15:20 +02:00
62d10f248d Replace NetMQ with custom TCP logic 2025-09-11 20:20:16 +02:00
12 changed files with 203 additions and 227 deletions

View File

@@ -4,7 +4,6 @@ using Phantom.Agent.Minecraft.Java;
using Phantom.Agent.Services.Backups; using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Instances; using Phantom.Agent.Services.Instances;
using Phantom.Agent.Services.Rpc; using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
@@ -53,24 +52,15 @@ public sealed class AgentServices {
public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) { public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) {
Logger.Information("Registering with the controller..."); Logger.Information("Registering with the controller...");
Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError> registrationResult; ImmutableArray<ConfigureInstanceMessage> configureInstanceMessages;
try { try {
registrationResult = await connection.Send<RegisterAgentMessage, Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken); configureInstanceMessages = await connection.Send<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
} catch (Exception e) { } catch (Exception e) {
Logger.Fatal(e, "Registration failed."); Logger.Fatal(e, "Registration failed.");
return false; return false;
} }
if (!registrationResult) { foreach (var configureInstanceMessage in configureInstanceMessages) {
Logger.Fatal("Registration failed: {Error}", registrationResult.Error switch {
RegisterAgentError.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
_ => "Unknown error " + (byte) registrationResult.Error + ".",
});
return false;
}
foreach (var configureInstanceMessage in registrationResult.Value) {
var configureInstanceCommand = new InstanceManagerActor.ConfigureInstanceCommand( var configureInstanceCommand = new InstanceManagerActor.ConfigureInstanceCommand(
configureInstanceMessage.InstanceGuid, configureInstanceMessage.InstanceGuid,
configureInstanceMessage.Configuration, configureInstanceMessage.Configuration,

View File

@@ -8,7 +8,7 @@ public readonly record struct ConnectionKey(RpcCertificateThumbprint Certificate
public byte[] ToBytes() { public byte[] ToBytes() {
Span<byte> result = stackalloc byte[TokenLength + CertificateThumbprint.Bytes.Length]; Span<byte> result = stackalloc byte[TokenLength + CertificateThumbprint.Bytes.Length];
AuthToken.WriteTo(result[..TokenLength]); AuthToken.Bytes.CopyTo(result[..TokenLength]);
CertificateThumbprint.Bytes.CopyTo(result[TokenLength..]); CertificateThumbprint.Bytes.CopyTo(result[TokenLength..]);
return result.ToArray(); return result.ToArray();
} }

View File

@@ -1,5 +0,0 @@
namespace Phantom.Common.Data.Replies;
public enum RegisterAgentError : byte {
ConnectionAlreadyHasAnAgent = 0,
}

View File

@@ -1,4 +1,5 @@
using Phantom.Common.Data; using System.Collections.Immutable;
using Phantom.Common.Data;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
@@ -19,7 +20,7 @@ public static class AgentMessageRegistries {
ToAgent.Add<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(4); ToAgent.Add<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(4);
ToAgent.Add<SendCommandToInstanceMessage, Result<SendCommandToInstanceResult, InstanceActionFailure>>(5); ToAgent.Add<SendCommandToInstanceMessage, Result<SendCommandToInstanceResult, InstanceActionFailure>>(5);
ToController.Add<RegisterAgentMessage>(0); ToController.Add<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(0);
ToController.Add<UnregisterAgentMessage>(1); ToController.Add<UnregisterAgentMessage>(1);
ToController.Add<AgentIsAliveMessage>(2); ToController.Add<AgentIsAliveMessage>(2);
ToController.Add<AdvertiseJavaRuntimesMessage>(3); ToController.Add<AdvertiseJavaRuntimesMessage>(3);

View File

@@ -1,9 +0,0 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentFailureMessage(
[property: MemoryPackOrder(0)] RegisterAgentError ErrorKind
) : IMessageToAgent;

View File

@@ -1,9 +0,0 @@
using System.Collections.Immutable;
using MemoryPack;
namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentSuccessMessage(
[property: MemoryPackOrder(0)] ImmutableArray<ConfigureInstanceMessage> InitialInstanceConfigurations
) : IMessageToAgent;

View File

@@ -1,8 +1,6 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using MemoryPack; using MemoryPack;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Utils.Actor; using Phantom.Utils.Actor;
@@ -11,4 +9,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentMessage( public sealed partial record RegisterAgentMessage(
[property: MemoryPackOrder(0)] AgentInfo AgentInfo [property: MemoryPackOrder(0)] AgentInfo AgentInfo
) : IMessageToController, ICanReply<Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>; ) : IMessageToController, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;

View File

@@ -87,7 +87,6 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
ReceiveAndReply<GetAgentJavaRuntimesMessage, ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>(HandleGetAgentJavaRuntimes); ReceiveAndReply<GetAgentJavaRuntimesMessage, ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>(HandleGetAgentJavaRuntimes);
ReceiveAndReplyLater<GetAuditLogMessage, Result<ImmutableArray<AuditLogItem>, UserActionFailure>>(HandleGetAuditLog); ReceiveAndReplyLater<GetAuditLogMessage, Result<ImmutableArray<AuditLogItem>, UserActionFailure>>(HandleGetAuditLog);
ReceiveAndReplyLater<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(HandleGetEventLog); ReceiveAndReplyLater<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(HandleGetEventLog);
Receive<ReplyMessage>(HandleReply);
} }
private async Task HandleRegisterWeb(RegisterWebMessage message) { private async Task HandleRegisterWeb(RegisterWebMessage message) {
@@ -189,8 +188,4 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
private Task<Result<ImmutableArray<EventLogItem>, UserActionFailure>> HandleGetEventLog(GetEventLogMessage message) { private Task<Result<ImmutableArray<EventLogItem>, UserActionFailure>> HandleGetEventLog(GetEventLogMessage message) {
return eventLogManager.GetMostRecentItems(userLoginManager.GetLoggedInUser(message.AuthToken), message.Count); return eventLogManager.GetMostRecentItems(userLoginManager.GetLoggedInUser(message.AuthToken), message.Count);
} }
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
} }

View File

@@ -71,41 +71,16 @@ internal sealed class RpcClientConnector {
throw; throw;
} }
SslStream? stream = null; SslStream? stream;
bool handledException = false;
try { try {
stream = new SslStream(new NetworkStream(clientSocket, ownsSocket: false), leaveInnerStreamOpen: false); stream = new SslStream(new NetworkStream(clientSocket, ownsSocket: false), leaveInnerStreamOpen: false);
try { if (await FinalizeConnection(stream, cancellationToken)) {
loggedCertificateValidationError = false;
await stream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
} catch (AuthenticationException e) {
if (!loggedCertificateValidationError) {
logger.Error(e, "Could not establish a secure connection.");
}
handledException = true;
throw;
}
logger.Information("Established a secure connection.");
try {
await PerformApplicationHandshake(stream, cancellationToken);
} catch (EndOfStreamException) {
logger.Warning("Could not perform application handshake, connection lost.");
handledException = true;
throw;
} catch (Exception e) {
logger.Warning(e, "Could not perform application handshake.");
handledException = true;
throw;
}
return new Connection(clientSocket, stream); return new Connection(clientSocket, stream);
}
} catch (Exception e) { } catch (Exception e) {
if (!handledException) {
logger.Error(e, "Caught unhandled exception."); logger.Error(e, "Caught unhandled exception.");
stream = null;
} }
try { try {
@@ -116,6 +91,51 @@ internal sealed class RpcClientConnector {
return null; return null;
} }
private async Task<bool> FinalizeConnection(SslStream stream, CancellationToken cancellationToken) {
try {
loggedCertificateValidationError = false;
await stream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
} catch (AuthenticationException e) {
if (!loggedCertificateValidationError) {
logger.Error(e, "Could not establish a secure connection.");
}
return false;
}
logger.Information("Established a secure connection.");
try {
if (!await PerformApplicationHandshake(stream, cancellationToken)) {
return false;
}
} catch (EndOfStreamException) {
logger.Warning("Could not perform application handshake, connection lost.");
} catch (Exception e) {
logger.Warning(e, "Could not perform application handshake.");
}
return true;
}
private async Task<bool> PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
await Serialization.WriteAuthToken(parameters.AuthToken, stream, cancellationToken);
await Serialization.WriteGuid(sessionId, stream, cancellationToken);
var result = (RpcHandshakeResult) await Serialization.ReadByte(stream, cancellationToken);
switch (result) {
case RpcHandshakeResult.Success:
return true;
case RpcHandshakeResult.InvalidAuthToken:
logger.Error("Server rejected authorization token.");
return false;
default:
logger.Error("Server rejected client due to unknown error: {ErrorId}", result);
return false;
}
} }
private bool ValidateServerCertificate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) { private bool ValidateServerCertificate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) {
@@ -142,12 +162,6 @@ internal sealed class RpcClientConnector {
return false; return false;
} }
private async Task PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
await Serialization.WriteAuthToken(parameters.AuthToken, stream, cancellationToken);
await Serialization.WriteGuid(sessionId, stream, cancellationToken);
// TODO read response
}
private static async Task DisconnectSocket(Socket socket, Stream? stream) { private static async Task DisconnectSocket(Socket socket, Stream? stream) {
if (stream != null) { if (stream != null) {
await stream.DisposeAsync(); await stream.DisposeAsync();

View File

@@ -0,0 +1,5 @@
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcServerClient {
}

View File

@@ -1,135 +1,135 @@
using System.Collections.Concurrent; // using System.Collections.Concurrent;
using Akka.Actor; // using Akka.Actor;
using NetMQ.Sockets; // using NetMQ.Sockets;
using Phantom.Utils.Actor; // using Phantom.Utils.Actor;
using Phantom.Utils.Logging; // using Phantom.Utils.Logging;
using Serilog; // using Serilog;
using Serilog.Events; // using Serilog.Events;
//
namespace Phantom.Utils.Rpc.Runtime2; // namespace Phantom.Utils.Rpc.Runtime2;
//
public static class RpcServerRuntime { // public static class RpcServerRuntime {
public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>( // public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
RpcConfiguration config, // RpcConfiguration config,
IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, // IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions,
IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, // IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler,
IActorRefFactory actorSystem, // IActorRefFactory actorSystem,
CancellationToken cancellationToken // CancellationToken cancellationToken
) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage { // ) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken); // return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken);
} // }
} // }
//
internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage { // internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) { // internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config); // var socket = RpcServerSocket.Connect(config);
return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch(); // return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch();
} // }
//
private readonly string serviceName; // private readonly string serviceName;
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions; // private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler; // private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
private readonly IActorRefFactory actorSystem; // private readonly IActorRefFactory actorSystem;
private readonly CancellationToken cancellationToken; // private readonly CancellationToken cancellationToken;
//
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) { // private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) {
this.serviceName = socket.Config.ServiceName; // this.serviceName = socket.Config.ServiceName;
this.messageDefinitions = messageDefinitions; // this.messageDefinitions = messageDefinitions;
this.registrationHandler = registrationHandler; // this.registrationHandler = registrationHandler;
this.actorSystem = actorSystem; // this.actorSystem = actorSystem;
this.cancellationToken = cancellationToken; // this.cancellationToken = cancellationToken;
} // }
//
private protected override Task Run(ServerSocket socket) { // private protected override Task Run(ServerSocket socket) {
var clients = new ConcurrentDictionary<ulong, Client>(); // var clients = new ConcurrentDictionary<ulong, Client>();
//
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) { // void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
if (clients.Remove(e.RoutingId, out var client)) { // if (clients.Remove(e.RoutingId, out var client)) {
client.Connection.Closed -= OnConnectionClosed; // client.Connection.Closed -= OnConnectionClosed;
} // }
} // }
//
while (!cancellationToken.IsCancellationRequested) { // while (!cancellationToken.IsCancellationRequested) {
var (routingId, data) = socket.Receive(cancellationToken); // var (routingId, data) = socket.Receive(cancellationToken);
//
if (data.Length == 0) { // if (data.Length == 0) {
LogUnknownMessage(routingId, data); // LogUnknownMessage(routingId, data);
continue; // continue;
} // }
//
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null; // Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
if (messageType == null) { // if (messageType == null) {
LogUnknownMessage(routingId, data); // LogUnknownMessage(routingId, data);
continue; // continue;
} // }
//
if (!clients.TryGetValue(routingId, out var client)) { // if (!clients.TryGetValue(routingId, out var client)) {
if (messageType != typeof(TRegistrationMessage)) { // if (messageType != typeof(TRegistrationMessage)) {
RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId); // RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId);
continue; // continue;
} // }
//
var clientLoggerName = LoggerName + ":" + routingId; // var clientLoggerName = LoggerName + ":" + routingId;
var clientActorName = "Rpc-" + serviceName + "-" + routingId; // var clientActorName = "Rpc-" + serviceName + "-" + routingId;
//
// TODO add pings and tear down connection after too much inactivity // // TODO add pings and tear down connection after too much inactivity
var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker); // var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker);
connection.Closed += OnConnectionClosed; // connection.Closed += OnConnectionClosed;
//
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler); // client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler);
clients[routingId] = client; // clients[routingId] = client;
} // }
//
client.Enqueue(messageType, data); // client.Enqueue(messageType, data);
} // }
//
foreach (var client in clients.Values) { // foreach (var client in clients.Values) {
client.Connection.Close(); // client.Connection.Close();
} // }
//
return Task.CompletedTask; // return Task.CompletedTask;
} // }
//
private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) { // private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) {
RuntimeLogger.Warning("Received unknown message ({Bytes} B) from {RoutingId}.", data.Length, routingId); // RuntimeLogger.Warning("Received unknown message ({Bytes} B) from {RoutingId}.", data.Length, routingId);
} // }
//
private protected override Task Disconnect(ServerSocket socket) { // private protected override Task Disconnect(ServerSocket socket) {
return Task.CompletedTask; // return Task.CompletedTask;
} // }
//
private sealed class Client { // private sealed class Client {
public RpcConnectionToClient<TClientMessage> Connection { get; } // public RpcConnectionToClient<TClientMessage> Connection { get; }
//
private readonly ILogger logger; // private readonly ILogger logger;
private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor; // private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor;
//
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) { // public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) {
this.Connection = connection; // this.Connection = connection;
this.Connection.Closed += OnConnectionClosed; // this.Connection.Closed += OnConnectionClosed;
//
this.logger = PhantomLogger.Create(loggerName); // this.logger = PhantomLogger.Create(loggerName);
//
var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection); // var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection);
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver"); // this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver");
} // }
//
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) { // internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data); // LogMessageType(messageType, data);
receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data)); // receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data));
} // }
//
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) { // private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
if (logger.IsEnabled(LogEventLevel.Verbose)) { // if (logger.IsEnabled(LogEventLevel.Verbose)) {
logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length); // logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
} // }
} // }
//
private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) { // private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
Connection.Closed -= OnConnectionClosed; // Connection.Closed -= OnConnectionClosed;
//
logger.Debug("Closing connection..."); // logger.Debug("Closing connection...");
receiverActor.Stop(); // receiverActor.Stop();
} // }
} // }
} // }

View File

@@ -74,10 +74,6 @@ sealed class Base24 {
} }
public byte[] Decode(ReadOnlySpan<char> data) { public byte[] Decode(ReadOnlySpan<char> data) {
if (data == null) {
throw new ArgumentNullException(nameof(data));
}
if (data.Length % 7 != 0) { if (data.Length % 7 != 0) {
throw new ArgumentException("The data length must be multiple of 7 chars."); throw new ArgumentException("The data length must be multiple of 7 chars.");
} }