mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-09-17 12:24:49 +02:00
Compare commits
2 Commits
3f8e46ae2d
...
a8fe48ec77
Author | SHA1 | Date | |
---|---|---|---|
a8fe48ec77
|
|||
62d10f248d
|
@@ -4,7 +4,6 @@ using Phantom.Agent.Minecraft.Java;
|
||||
using Phantom.Agent.Services.Backups;
|
||||
using Phantom.Agent.Services.Instances;
|
||||
using Phantom.Agent.Services.Rpc;
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Agent;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
@@ -53,24 +52,15 @@ public sealed class AgentServices {
|
||||
public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) {
|
||||
Logger.Information("Registering with the controller...");
|
||||
|
||||
Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError> registrationResult;
|
||||
ImmutableArray<ConfigureInstanceMessage> configureInstanceMessages;
|
||||
try {
|
||||
registrationResult = await connection.Send<RegisterAgentMessage, Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
|
||||
configureInstanceMessages = await connection.Send<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
|
||||
} catch (Exception e) {
|
||||
Logger.Fatal(e, "Registration failed.");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!registrationResult) {
|
||||
Logger.Fatal("Registration failed: {Error}", registrationResult.Error switch {
|
||||
RegisterAgentError.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
|
||||
_ => "Unknown error " + (byte) registrationResult.Error + ".",
|
||||
});
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach (var configureInstanceMessage in registrationResult.Value) {
|
||||
foreach (var configureInstanceMessage in configureInstanceMessages) {
|
||||
var configureInstanceCommand = new InstanceManagerActor.ConfigureInstanceCommand(
|
||||
configureInstanceMessage.InstanceGuid,
|
||||
configureInstanceMessage.Configuration,
|
||||
|
@@ -8,7 +8,7 @@ public readonly record struct ConnectionKey(RpcCertificateThumbprint Certificate
|
||||
|
||||
public byte[] ToBytes() {
|
||||
Span<byte> result = stackalloc byte[TokenLength + CertificateThumbprint.Bytes.Length];
|
||||
AuthToken.WriteTo(result[..TokenLength]);
|
||||
AuthToken.Bytes.CopyTo(result[..TokenLength]);
|
||||
CertificateThumbprint.Bytes.CopyTo(result[TokenLength..]);
|
||||
return result.ToArray();
|
||||
}
|
||||
|
@@ -1,5 +0,0 @@
|
||||
namespace Phantom.Common.Data.Replies;
|
||||
|
||||
public enum RegisterAgentError : byte {
|
||||
ConnectionAlreadyHasAnAgent = 0,
|
||||
}
|
@@ -1,4 +1,5 @@
|
||||
using Phantom.Common.Data;
|
||||
using System.Collections.Immutable;
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
@@ -19,7 +20,7 @@ public static class AgentMessageRegistries {
|
||||
ToAgent.Add<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(4);
|
||||
ToAgent.Add<SendCommandToInstanceMessage, Result<SendCommandToInstanceResult, InstanceActionFailure>>(5);
|
||||
|
||||
ToController.Add<RegisterAgentMessage>(0);
|
||||
ToController.Add<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(0);
|
||||
ToController.Add<UnregisterAgentMessage>(1);
|
||||
ToController.Add<AgentIsAliveMessage>(2);
|
||||
ToController.Add<AdvertiseJavaRuntimesMessage>(3);
|
||||
|
@@ -1,9 +0,0 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data.Replies;
|
||||
|
||||
namespace Phantom.Common.Messages.Agent.ToAgent;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record RegisterAgentFailureMessage(
|
||||
[property: MemoryPackOrder(0)] RegisterAgentError ErrorKind
|
||||
) : IMessageToAgent;
|
@@ -1,9 +0,0 @@
|
||||
using System.Collections.Immutable;
|
||||
using MemoryPack;
|
||||
|
||||
namespace Phantom.Common.Messages.Agent.ToAgent;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record RegisterAgentSuccessMessage(
|
||||
[property: MemoryPackOrder(0)] ImmutableArray<ConfigureInstanceMessage> InitialInstanceConfigurations
|
||||
) : IMessageToAgent;
|
@@ -1,8 +1,6 @@
|
||||
using System.Collections.Immutable;
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Agent;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Utils.Actor;
|
||||
|
||||
@@ -11,4 +9,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record RegisterAgentMessage(
|
||||
[property: MemoryPackOrder(0)] AgentInfo AgentInfo
|
||||
) : IMessageToController, ICanReply<Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>;
|
||||
) : IMessageToController, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
|
||||
|
@@ -87,7 +87,6 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
||||
ReceiveAndReply<GetAgentJavaRuntimesMessage, ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>(HandleGetAgentJavaRuntimes);
|
||||
ReceiveAndReplyLater<GetAuditLogMessage, Result<ImmutableArray<AuditLogItem>, UserActionFailure>>(HandleGetAuditLog);
|
||||
ReceiveAndReplyLater<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(HandleGetEventLog);
|
||||
Receive<ReplyMessage>(HandleReply);
|
||||
}
|
||||
|
||||
private async Task HandleRegisterWeb(RegisterWebMessage message) {
|
||||
@@ -189,8 +188,4 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
||||
private Task<Result<ImmutableArray<EventLogItem>, UserActionFailure>> HandleGetEventLog(GetEventLogMessage message) {
|
||||
return eventLogManager.GetMostRecentItems(userLoginManager.GetLoggedInUser(message.AuthToken), message.Count);
|
||||
}
|
||||
|
||||
private void HandleReply(ReplyMessage message) {
|
||||
connection.Receive(message);
|
||||
}
|
||||
}
|
||||
|
@@ -71,50 +71,70 @@ internal sealed class RpcClientConnector {
|
||||
throw;
|
||||
}
|
||||
|
||||
SslStream? stream = null;
|
||||
bool handledException = false;
|
||||
SslStream? stream;
|
||||
try {
|
||||
stream = new SslStream(new NetworkStream(clientSocket, ownsSocket: false), leaveInnerStreamOpen: false);
|
||||
|
||||
try {
|
||||
loggedCertificateValidationError = false;
|
||||
await stream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
|
||||
} catch (AuthenticationException e) {
|
||||
if (!loggedCertificateValidationError) {
|
||||
logger.Error(e, "Could not establish a secure connection.");
|
||||
}
|
||||
|
||||
handledException = true;
|
||||
throw;
|
||||
if (await FinalizeConnection(stream, cancellationToken)) {
|
||||
return new Connection(clientSocket, stream);
|
||||
}
|
||||
|
||||
logger.Information("Established a secure connection.");
|
||||
|
||||
try {
|
||||
await PerformApplicationHandshake(stream, cancellationToken);
|
||||
} catch (EndOfStreamException) {
|
||||
logger.Warning("Could not perform application handshake, connection lost.");
|
||||
handledException = true;
|
||||
throw;
|
||||
} catch (Exception e) {
|
||||
logger.Warning(e, "Could not perform application handshake.");
|
||||
handledException = true;
|
||||
throw;
|
||||
}
|
||||
|
||||
return new Connection(clientSocket, stream);
|
||||
} catch (Exception e) {
|
||||
if (!handledException) {
|
||||
logger.Error(e, "Caught unhandled exception.");
|
||||
logger.Error(e, "Caught unhandled exception.");
|
||||
stream = null;
|
||||
}
|
||||
|
||||
try {
|
||||
await DisconnectSocket(clientSocket, stream);
|
||||
} finally {
|
||||
clientSocket.Close();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private async Task<bool> FinalizeConnection(SslStream stream, CancellationToken cancellationToken) {
|
||||
try {
|
||||
loggedCertificateValidationError = false;
|
||||
await stream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
|
||||
} catch (AuthenticationException e) {
|
||||
if (!loggedCertificateValidationError) {
|
||||
logger.Error(e, "Could not establish a secure connection.");
|
||||
}
|
||||
|
||||
try {
|
||||
await DisconnectSocket(clientSocket, stream);
|
||||
} finally {
|
||||
clientSocket.Close();
|
||||
return false;
|
||||
}
|
||||
|
||||
logger.Information("Established a secure connection.");
|
||||
|
||||
try {
|
||||
if (!await PerformApplicationHandshake(stream, cancellationToken)) {
|
||||
return false;
|
||||
}
|
||||
} catch (EndOfStreamException) {
|
||||
logger.Warning("Could not perform application handshake, connection lost.");
|
||||
} catch (Exception e) {
|
||||
logger.Warning(e, "Could not perform application handshake.");
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private async Task<bool> PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
|
||||
await Serialization.WriteAuthToken(parameters.AuthToken, stream, cancellationToken);
|
||||
await Serialization.WriteGuid(sessionId, stream, cancellationToken);
|
||||
|
||||
var result = (RpcHandshakeResult) await Serialization.ReadByte(stream, cancellationToken);
|
||||
switch (result) {
|
||||
case RpcHandshakeResult.Success:
|
||||
return true;
|
||||
|
||||
return null;
|
||||
case RpcHandshakeResult.InvalidAuthToken:
|
||||
logger.Error("Server rejected authorization token.");
|
||||
return false;
|
||||
|
||||
default:
|
||||
logger.Error("Server rejected client due to unknown error: {ErrorId}", result);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,12 +162,6 @@ internal sealed class RpcClientConnector {
|
||||
return false;
|
||||
}
|
||||
|
||||
private async Task PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
|
||||
await Serialization.WriteAuthToken(parameters.AuthToken, stream, cancellationToken);
|
||||
await Serialization.WriteGuid(sessionId, stream, cancellationToken);
|
||||
// TODO read response
|
||||
}
|
||||
|
||||
private static async Task DisconnectSocket(Socket socket, Stream? stream) {
|
||||
if (stream != null) {
|
||||
await stream.DisposeAsync();
|
||||
|
5
Utils/Phantom.Utils.Rpc/Runtime/RpcServerClient.cs
Normal file
5
Utils/Phantom.Utils.Rpc/Runtime/RpcServerClient.cs
Normal file
@@ -0,0 +1,5 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public sealed class RpcServerClient {
|
||||
|
||||
}
|
@@ -1,135 +1,135 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Akka.Actor;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Serilog;
|
||||
using Serilog.Events;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime2;
|
||||
|
||||
public static class RpcServerRuntime {
|
||||
public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
|
||||
RpcConfiguration config,
|
||||
IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions,
|
||||
IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler,
|
||||
IActorRefFactory actorSystem,
|
||||
CancellationToken cancellationToken
|
||||
) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
||||
return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
||||
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) {
|
||||
var socket = RpcServerSocket.Connect(config);
|
||||
return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch();
|
||||
}
|
||||
|
||||
private readonly string serviceName;
|
||||
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
|
||||
private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
|
||||
private readonly IActorRefFactory actorSystem;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) {
|
||||
this.serviceName = socket.Config.ServiceName;
|
||||
this.messageDefinitions = messageDefinitions;
|
||||
this.registrationHandler = registrationHandler;
|
||||
this.actorSystem = actorSystem;
|
||||
this.cancellationToken = cancellationToken;
|
||||
}
|
||||
|
||||
private protected override Task Run(ServerSocket socket) {
|
||||
var clients = new ConcurrentDictionary<ulong, Client>();
|
||||
|
||||
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
||||
if (clients.Remove(e.RoutingId, out var client)) {
|
||||
client.Connection.Closed -= OnConnectionClosed;
|
||||
}
|
||||
}
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested) {
|
||||
var (routingId, data) = socket.Receive(cancellationToken);
|
||||
|
||||
if (data.Length == 0) {
|
||||
LogUnknownMessage(routingId, data);
|
||||
continue;
|
||||
}
|
||||
|
||||
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
|
||||
if (messageType == null) {
|
||||
LogUnknownMessage(routingId, data);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!clients.TryGetValue(routingId, out var client)) {
|
||||
if (messageType != typeof(TRegistrationMessage)) {
|
||||
RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId);
|
||||
continue;
|
||||
}
|
||||
|
||||
var clientLoggerName = LoggerName + ":" + routingId;
|
||||
var clientActorName = "Rpc-" + serviceName + "-" + routingId;
|
||||
|
||||
// TODO add pings and tear down connection after too much inactivity
|
||||
var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker);
|
||||
connection.Closed += OnConnectionClosed;
|
||||
|
||||
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler);
|
||||
clients[routingId] = client;
|
||||
}
|
||||
|
||||
client.Enqueue(messageType, data);
|
||||
}
|
||||
|
||||
foreach (var client in clients.Values) {
|
||||
client.Connection.Close();
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) {
|
||||
RuntimeLogger.Warning("Received unknown message ({Bytes} B) from {RoutingId}.", data.Length, routingId);
|
||||
}
|
||||
|
||||
private protected override Task Disconnect(ServerSocket socket) {
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private sealed class Client {
|
||||
public RpcConnectionToClient<TClientMessage> Connection { get; }
|
||||
|
||||
private readonly ILogger logger;
|
||||
private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor;
|
||||
|
||||
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) {
|
||||
this.Connection = connection;
|
||||
this.Connection.Closed += OnConnectionClosed;
|
||||
|
||||
this.logger = PhantomLogger.Create(loggerName);
|
||||
|
||||
var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection);
|
||||
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver");
|
||||
}
|
||||
|
||||
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
|
||||
LogMessageType(messageType, data);
|
||||
receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data));
|
||||
}
|
||||
|
||||
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
|
||||
if (logger.IsEnabled(LogEventLevel.Verbose)) {
|
||||
logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
|
||||
}
|
||||
}
|
||||
|
||||
private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
||||
Connection.Closed -= OnConnectionClosed;
|
||||
|
||||
logger.Debug("Closing connection...");
|
||||
receiverActor.Stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
// using System.Collections.Concurrent;
|
||||
// using Akka.Actor;
|
||||
// using NetMQ.Sockets;
|
||||
// using Phantom.Utils.Actor;
|
||||
// using Phantom.Utils.Logging;
|
||||
// using Serilog;
|
||||
// using Serilog.Events;
|
||||
//
|
||||
// namespace Phantom.Utils.Rpc.Runtime2;
|
||||
//
|
||||
// public static class RpcServerRuntime {
|
||||
// public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
|
||||
// RpcConfiguration config,
|
||||
// IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions,
|
||||
// IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler,
|
||||
// IActorRefFactory actorSystem,
|
||||
// CancellationToken cancellationToken
|
||||
// ) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
||||
// return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
||||
// internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) {
|
||||
// var socket = RpcServerSocket.Connect(config);
|
||||
// return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch();
|
||||
// }
|
||||
//
|
||||
// private readonly string serviceName;
|
||||
// private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
|
||||
// private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
|
||||
// private readonly IActorRefFactory actorSystem;
|
||||
// private readonly CancellationToken cancellationToken;
|
||||
//
|
||||
// private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) {
|
||||
// this.serviceName = socket.Config.ServiceName;
|
||||
// this.messageDefinitions = messageDefinitions;
|
||||
// this.registrationHandler = registrationHandler;
|
||||
// this.actorSystem = actorSystem;
|
||||
// this.cancellationToken = cancellationToken;
|
||||
// }
|
||||
//
|
||||
// private protected override Task Run(ServerSocket socket) {
|
||||
// var clients = new ConcurrentDictionary<ulong, Client>();
|
||||
//
|
||||
// void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
||||
// if (clients.Remove(e.RoutingId, out var client)) {
|
||||
// client.Connection.Closed -= OnConnectionClosed;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// while (!cancellationToken.IsCancellationRequested) {
|
||||
// var (routingId, data) = socket.Receive(cancellationToken);
|
||||
//
|
||||
// if (data.Length == 0) {
|
||||
// LogUnknownMessage(routingId, data);
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
|
||||
// if (messageType == null) {
|
||||
// LogUnknownMessage(routingId, data);
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// if (!clients.TryGetValue(routingId, out var client)) {
|
||||
// if (messageType != typeof(TRegistrationMessage)) {
|
||||
// RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId);
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// var clientLoggerName = LoggerName + ":" + routingId;
|
||||
// var clientActorName = "Rpc-" + serviceName + "-" + routingId;
|
||||
//
|
||||
// // TODO add pings and tear down connection after too much inactivity
|
||||
// var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker);
|
||||
// connection.Closed += OnConnectionClosed;
|
||||
//
|
||||
// client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler);
|
||||
// clients[routingId] = client;
|
||||
// }
|
||||
//
|
||||
// client.Enqueue(messageType, data);
|
||||
// }
|
||||
//
|
||||
// foreach (var client in clients.Values) {
|
||||
// client.Connection.Close();
|
||||
// }
|
||||
//
|
||||
// return Task.CompletedTask;
|
||||
// }
|
||||
//
|
||||
// private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) {
|
||||
// RuntimeLogger.Warning("Received unknown message ({Bytes} B) from {RoutingId}.", data.Length, routingId);
|
||||
// }
|
||||
//
|
||||
// private protected override Task Disconnect(ServerSocket socket) {
|
||||
// return Task.CompletedTask;
|
||||
// }
|
||||
//
|
||||
// private sealed class Client {
|
||||
// public RpcConnectionToClient<TClientMessage> Connection { get; }
|
||||
//
|
||||
// private readonly ILogger logger;
|
||||
// private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor;
|
||||
//
|
||||
// public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) {
|
||||
// this.Connection = connection;
|
||||
// this.Connection.Closed += OnConnectionClosed;
|
||||
//
|
||||
// this.logger = PhantomLogger.Create(loggerName);
|
||||
//
|
||||
// var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection);
|
||||
// this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver");
|
||||
// }
|
||||
//
|
||||
// internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
|
||||
// LogMessageType(messageType, data);
|
||||
// receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data));
|
||||
// }
|
||||
//
|
||||
// private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
|
||||
// if (logger.IsEnabled(LogEventLevel.Verbose)) {
|
||||
// logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
||||
// Connection.Closed -= OnConnectionClosed;
|
||||
//
|
||||
// logger.Debug("Closing connection...");
|
||||
// receiverActor.Stop();
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
@@ -74,10 +74,6 @@ sealed class Base24 {
|
||||
}
|
||||
|
||||
public byte[] Decode(ReadOnlySpan<char> data) {
|
||||
if (data == null) {
|
||||
throw new ArgumentNullException(nameof(data));
|
||||
}
|
||||
|
||||
if (data.Length % 7 != 0) {
|
||||
throw new ArgumentException("The data length must be multiple of 7 chars.");
|
||||
}
|
||||
|
Reference in New Issue
Block a user