mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-09-17 12:24:49 +02:00
Compare commits
2 Commits
3f8e46ae2d
...
a8fe48ec77
Author | SHA1 | Date | |
---|---|---|---|
a8fe48ec77
|
|||
62d10f248d
|
@@ -4,7 +4,6 @@ using Phantom.Agent.Minecraft.Java;
|
|||||||
using Phantom.Agent.Services.Backups;
|
using Phantom.Agent.Services.Backups;
|
||||||
using Phantom.Agent.Services.Instances;
|
using Phantom.Agent.Services.Instances;
|
||||||
using Phantom.Agent.Services.Rpc;
|
using Phantom.Agent.Services.Rpc;
|
||||||
using Phantom.Common.Data;
|
|
||||||
using Phantom.Common.Data.Agent;
|
using Phantom.Common.Data.Agent;
|
||||||
using Phantom.Common.Data.Replies;
|
using Phantom.Common.Data.Replies;
|
||||||
using Phantom.Common.Messages.Agent.ToAgent;
|
using Phantom.Common.Messages.Agent.ToAgent;
|
||||||
@@ -53,24 +52,15 @@ public sealed class AgentServices {
|
|||||||
public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) {
|
public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) {
|
||||||
Logger.Information("Registering with the controller...");
|
Logger.Information("Registering with the controller...");
|
||||||
|
|
||||||
Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError> registrationResult;
|
ImmutableArray<ConfigureInstanceMessage> configureInstanceMessages;
|
||||||
try {
|
try {
|
||||||
registrationResult = await connection.Send<RegisterAgentMessage, Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
|
configureInstanceMessages = await connection.Send<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Logger.Fatal(e, "Registration failed.");
|
Logger.Fatal(e, "Registration failed.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!registrationResult) {
|
foreach (var configureInstanceMessage in configureInstanceMessages) {
|
||||||
Logger.Fatal("Registration failed: {Error}", registrationResult.Error switch {
|
|
||||||
RegisterAgentError.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
|
|
||||||
_ => "Unknown error " + (byte) registrationResult.Error + ".",
|
|
||||||
});
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach (var configureInstanceMessage in registrationResult.Value) {
|
|
||||||
var configureInstanceCommand = new InstanceManagerActor.ConfigureInstanceCommand(
|
var configureInstanceCommand = new InstanceManagerActor.ConfigureInstanceCommand(
|
||||||
configureInstanceMessage.InstanceGuid,
|
configureInstanceMessage.InstanceGuid,
|
||||||
configureInstanceMessage.Configuration,
|
configureInstanceMessage.Configuration,
|
||||||
|
@@ -8,7 +8,7 @@ public readonly record struct ConnectionKey(RpcCertificateThumbprint Certificate
|
|||||||
|
|
||||||
public byte[] ToBytes() {
|
public byte[] ToBytes() {
|
||||||
Span<byte> result = stackalloc byte[TokenLength + CertificateThumbprint.Bytes.Length];
|
Span<byte> result = stackalloc byte[TokenLength + CertificateThumbprint.Bytes.Length];
|
||||||
AuthToken.WriteTo(result[..TokenLength]);
|
AuthToken.Bytes.CopyTo(result[..TokenLength]);
|
||||||
CertificateThumbprint.Bytes.CopyTo(result[TokenLength..]);
|
CertificateThumbprint.Bytes.CopyTo(result[TokenLength..]);
|
||||||
return result.ToArray();
|
return result.ToArray();
|
||||||
}
|
}
|
||||||
|
@@ -1,5 +0,0 @@
|
|||||||
namespace Phantom.Common.Data.Replies;
|
|
||||||
|
|
||||||
public enum RegisterAgentError : byte {
|
|
||||||
ConnectionAlreadyHasAnAgent = 0,
|
|
||||||
}
|
|
@@ -1,4 +1,5 @@
|
|||||||
using Phantom.Common.Data;
|
using System.Collections.Immutable;
|
||||||
|
using Phantom.Common.Data;
|
||||||
using Phantom.Common.Data.Replies;
|
using Phantom.Common.Data.Replies;
|
||||||
using Phantom.Common.Messages.Agent.ToAgent;
|
using Phantom.Common.Messages.Agent.ToAgent;
|
||||||
using Phantom.Common.Messages.Agent.ToController;
|
using Phantom.Common.Messages.Agent.ToController;
|
||||||
@@ -19,7 +20,7 @@ public static class AgentMessageRegistries {
|
|||||||
ToAgent.Add<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(4);
|
ToAgent.Add<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(4);
|
||||||
ToAgent.Add<SendCommandToInstanceMessage, Result<SendCommandToInstanceResult, InstanceActionFailure>>(5);
|
ToAgent.Add<SendCommandToInstanceMessage, Result<SendCommandToInstanceResult, InstanceActionFailure>>(5);
|
||||||
|
|
||||||
ToController.Add<RegisterAgentMessage>(0);
|
ToController.Add<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(0);
|
||||||
ToController.Add<UnregisterAgentMessage>(1);
|
ToController.Add<UnregisterAgentMessage>(1);
|
||||||
ToController.Add<AgentIsAliveMessage>(2);
|
ToController.Add<AgentIsAliveMessage>(2);
|
||||||
ToController.Add<AdvertiseJavaRuntimesMessage>(3);
|
ToController.Add<AdvertiseJavaRuntimesMessage>(3);
|
||||||
|
@@ -1,9 +0,0 @@
|
|||||||
using MemoryPack;
|
|
||||||
using Phantom.Common.Data.Replies;
|
|
||||||
|
|
||||||
namespace Phantom.Common.Messages.Agent.ToAgent;
|
|
||||||
|
|
||||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
|
||||||
public sealed partial record RegisterAgentFailureMessage(
|
|
||||||
[property: MemoryPackOrder(0)] RegisterAgentError ErrorKind
|
|
||||||
) : IMessageToAgent;
|
|
@@ -1,9 +0,0 @@
|
|||||||
using System.Collections.Immutable;
|
|
||||||
using MemoryPack;
|
|
||||||
|
|
||||||
namespace Phantom.Common.Messages.Agent.ToAgent;
|
|
||||||
|
|
||||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
|
||||||
public sealed partial record RegisterAgentSuccessMessage(
|
|
||||||
[property: MemoryPackOrder(0)] ImmutableArray<ConfigureInstanceMessage> InitialInstanceConfigurations
|
|
||||||
) : IMessageToAgent;
|
|
@@ -1,8 +1,6 @@
|
|||||||
using System.Collections.Immutable;
|
using System.Collections.Immutable;
|
||||||
using MemoryPack;
|
using MemoryPack;
|
||||||
using Phantom.Common.Data;
|
|
||||||
using Phantom.Common.Data.Agent;
|
using Phantom.Common.Data.Agent;
|
||||||
using Phantom.Common.Data.Replies;
|
|
||||||
using Phantom.Common.Messages.Agent.ToAgent;
|
using Phantom.Common.Messages.Agent.ToAgent;
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
|
|
||||||
@@ -11,4 +9,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
|
|||||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||||
public sealed partial record RegisterAgentMessage(
|
public sealed partial record RegisterAgentMessage(
|
||||||
[property: MemoryPackOrder(0)] AgentInfo AgentInfo
|
[property: MemoryPackOrder(0)] AgentInfo AgentInfo
|
||||||
) : IMessageToController, ICanReply<Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>;
|
) : IMessageToController, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
|
||||||
|
@@ -87,7 +87,6 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
|||||||
ReceiveAndReply<GetAgentJavaRuntimesMessage, ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>(HandleGetAgentJavaRuntimes);
|
ReceiveAndReply<GetAgentJavaRuntimesMessage, ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>(HandleGetAgentJavaRuntimes);
|
||||||
ReceiveAndReplyLater<GetAuditLogMessage, Result<ImmutableArray<AuditLogItem>, UserActionFailure>>(HandleGetAuditLog);
|
ReceiveAndReplyLater<GetAuditLogMessage, Result<ImmutableArray<AuditLogItem>, UserActionFailure>>(HandleGetAuditLog);
|
||||||
ReceiveAndReplyLater<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(HandleGetEventLog);
|
ReceiveAndReplyLater<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(HandleGetEventLog);
|
||||||
Receive<ReplyMessage>(HandleReply);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task HandleRegisterWeb(RegisterWebMessage message) {
|
private async Task HandleRegisterWeb(RegisterWebMessage message) {
|
||||||
@@ -189,8 +188,4 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
|||||||
private Task<Result<ImmutableArray<EventLogItem>, UserActionFailure>> HandleGetEventLog(GetEventLogMessage message) {
|
private Task<Result<ImmutableArray<EventLogItem>, UserActionFailure>> HandleGetEventLog(GetEventLogMessage message) {
|
||||||
return eventLogManager.GetMostRecentItems(userLoginManager.GetLoggedInUser(message.AuthToken), message.Count);
|
return eventLogManager.GetMostRecentItems(userLoginManager.GetLoggedInUser(message.AuthToken), message.Count);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleReply(ReplyMessage message) {
|
|
||||||
connection.Receive(message);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@@ -71,50 +71,70 @@ internal sealed class RpcClientConnector {
|
|||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
SslStream? stream = null;
|
SslStream? stream;
|
||||||
bool handledException = false;
|
|
||||||
try {
|
try {
|
||||||
stream = new SslStream(new NetworkStream(clientSocket, ownsSocket: false), leaveInnerStreamOpen: false);
|
stream = new SslStream(new NetworkStream(clientSocket, ownsSocket: false), leaveInnerStreamOpen: false);
|
||||||
|
|
||||||
try {
|
if (await FinalizeConnection(stream, cancellationToken)) {
|
||||||
loggedCertificateValidationError = false;
|
return new Connection(clientSocket, stream);
|
||||||
await stream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
|
|
||||||
} catch (AuthenticationException e) {
|
|
||||||
if (!loggedCertificateValidationError) {
|
|
||||||
logger.Error(e, "Could not establish a secure connection.");
|
|
||||||
}
|
|
||||||
|
|
||||||
handledException = true;
|
|
||||||
throw;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Information("Established a secure connection.");
|
|
||||||
|
|
||||||
try {
|
|
||||||
await PerformApplicationHandshake(stream, cancellationToken);
|
|
||||||
} catch (EndOfStreamException) {
|
|
||||||
logger.Warning("Could not perform application handshake, connection lost.");
|
|
||||||
handledException = true;
|
|
||||||
throw;
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.Warning(e, "Could not perform application handshake.");
|
|
||||||
handledException = true;
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Connection(clientSocket, stream);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (!handledException) {
|
logger.Error(e, "Caught unhandled exception.");
|
||||||
logger.Error(e, "Caught unhandled exception.");
|
stream = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await DisconnectSocket(clientSocket, stream);
|
||||||
|
} finally {
|
||||||
|
clientSocket.Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<bool> FinalizeConnection(SslStream stream, CancellationToken cancellationToken) {
|
||||||
|
try {
|
||||||
|
loggedCertificateValidationError = false;
|
||||||
|
await stream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
|
||||||
|
} catch (AuthenticationException e) {
|
||||||
|
if (!loggedCertificateValidationError) {
|
||||||
|
logger.Error(e, "Could not establish a secure connection.");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
return false;
|
||||||
await DisconnectSocket(clientSocket, stream);
|
}
|
||||||
} finally {
|
|
||||||
clientSocket.Close();
|
logger.Information("Established a secure connection.");
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!await PerformApplicationHandshake(stream, cancellationToken)) {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
} catch (EndOfStreamException) {
|
||||||
|
logger.Warning("Could not perform application handshake, connection lost.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.Warning(e, "Could not perform application handshake.");
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<bool> PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
|
||||||
|
await Serialization.WriteAuthToken(parameters.AuthToken, stream, cancellationToken);
|
||||||
|
await Serialization.WriteGuid(sessionId, stream, cancellationToken);
|
||||||
|
|
||||||
|
var result = (RpcHandshakeResult) await Serialization.ReadByte(stream, cancellationToken);
|
||||||
|
switch (result) {
|
||||||
|
case RpcHandshakeResult.Success:
|
||||||
|
return true;
|
||||||
|
|
||||||
return null;
|
case RpcHandshakeResult.InvalidAuthToken:
|
||||||
|
logger.Error("Server rejected authorization token.");
|
||||||
|
return false;
|
||||||
|
|
||||||
|
default:
|
||||||
|
logger.Error("Server rejected client due to unknown error: {ErrorId}", result);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -142,12 +162,6 @@ internal sealed class RpcClientConnector {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
|
|
||||||
await Serialization.WriteAuthToken(parameters.AuthToken, stream, cancellationToken);
|
|
||||||
await Serialization.WriteGuid(sessionId, stream, cancellationToken);
|
|
||||||
// TODO read response
|
|
||||||
}
|
|
||||||
|
|
||||||
private static async Task DisconnectSocket(Socket socket, Stream? stream) {
|
private static async Task DisconnectSocket(Socket socket, Stream? stream) {
|
||||||
if (stream != null) {
|
if (stream != null) {
|
||||||
await stream.DisposeAsync();
|
await stream.DisposeAsync();
|
||||||
|
5
Utils/Phantom.Utils.Rpc/Runtime/RpcServerClient.cs
Normal file
5
Utils/Phantom.Utils.Rpc/Runtime/RpcServerClient.cs
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
public sealed class RpcServerClient {
|
||||||
|
|
||||||
|
}
|
@@ -1,135 +1,135 @@
|
|||||||
using System.Collections.Concurrent;
|
// using System.Collections.Concurrent;
|
||||||
using Akka.Actor;
|
// using Akka.Actor;
|
||||||
using NetMQ.Sockets;
|
// using NetMQ.Sockets;
|
||||||
using Phantom.Utils.Actor;
|
// using Phantom.Utils.Actor;
|
||||||
using Phantom.Utils.Logging;
|
// using Phantom.Utils.Logging;
|
||||||
using Serilog;
|
// using Serilog;
|
||||||
using Serilog.Events;
|
// using Serilog.Events;
|
||||||
|
//
|
||||||
namespace Phantom.Utils.Rpc.Runtime2;
|
// namespace Phantom.Utils.Rpc.Runtime2;
|
||||||
|
//
|
||||||
public static class RpcServerRuntime {
|
// public static class RpcServerRuntime {
|
||||||
public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
|
// public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
|
||||||
RpcConfiguration config,
|
// RpcConfiguration config,
|
||||||
IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions,
|
// IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions,
|
||||||
IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler,
|
// IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler,
|
||||||
IActorRefFactory actorSystem,
|
// IActorRefFactory actorSystem,
|
||||||
CancellationToken cancellationToken
|
// CancellationToken cancellationToken
|
||||||
) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
// ) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
||||||
return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken);
|
// return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
// internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
||||||
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) {
|
// internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) {
|
||||||
var socket = RpcServerSocket.Connect(config);
|
// var socket = RpcServerSocket.Connect(config);
|
||||||
return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch();
|
// return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
private readonly string serviceName;
|
// private readonly string serviceName;
|
||||||
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
|
// private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
|
||||||
private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
|
// private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
|
||||||
private readonly IActorRefFactory actorSystem;
|
// private readonly IActorRefFactory actorSystem;
|
||||||
private readonly CancellationToken cancellationToken;
|
// private readonly CancellationToken cancellationToken;
|
||||||
|
//
|
||||||
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) {
|
// private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) {
|
||||||
this.serviceName = socket.Config.ServiceName;
|
// this.serviceName = socket.Config.ServiceName;
|
||||||
this.messageDefinitions = messageDefinitions;
|
// this.messageDefinitions = messageDefinitions;
|
||||||
this.registrationHandler = registrationHandler;
|
// this.registrationHandler = registrationHandler;
|
||||||
this.actorSystem = actorSystem;
|
// this.actorSystem = actorSystem;
|
||||||
this.cancellationToken = cancellationToken;
|
// this.cancellationToken = cancellationToken;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
private protected override Task Run(ServerSocket socket) {
|
// private protected override Task Run(ServerSocket socket) {
|
||||||
var clients = new ConcurrentDictionary<ulong, Client>();
|
// var clients = new ConcurrentDictionary<ulong, Client>();
|
||||||
|
//
|
||||||
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
// void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
||||||
if (clients.Remove(e.RoutingId, out var client)) {
|
// if (clients.Remove(e.RoutingId, out var client)) {
|
||||||
client.Connection.Closed -= OnConnectionClosed;
|
// client.Connection.Closed -= OnConnectionClosed;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
while (!cancellationToken.IsCancellationRequested) {
|
// while (!cancellationToken.IsCancellationRequested) {
|
||||||
var (routingId, data) = socket.Receive(cancellationToken);
|
// var (routingId, data) = socket.Receive(cancellationToken);
|
||||||
|
//
|
||||||
if (data.Length == 0) {
|
// if (data.Length == 0) {
|
||||||
LogUnknownMessage(routingId, data);
|
// LogUnknownMessage(routingId, data);
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
|
// Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
|
||||||
if (messageType == null) {
|
// if (messageType == null) {
|
||||||
LogUnknownMessage(routingId, data);
|
// LogUnknownMessage(routingId, data);
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
if (!clients.TryGetValue(routingId, out var client)) {
|
// if (!clients.TryGetValue(routingId, out var client)) {
|
||||||
if (messageType != typeof(TRegistrationMessage)) {
|
// if (messageType != typeof(TRegistrationMessage)) {
|
||||||
RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId);
|
// RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId);
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
var clientLoggerName = LoggerName + ":" + routingId;
|
// var clientLoggerName = LoggerName + ":" + routingId;
|
||||||
var clientActorName = "Rpc-" + serviceName + "-" + routingId;
|
// var clientActorName = "Rpc-" + serviceName + "-" + routingId;
|
||||||
|
//
|
||||||
// TODO add pings and tear down connection after too much inactivity
|
// // TODO add pings and tear down connection after too much inactivity
|
||||||
var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker);
|
// var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker);
|
||||||
connection.Closed += OnConnectionClosed;
|
// connection.Closed += OnConnectionClosed;
|
||||||
|
//
|
||||||
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler);
|
// client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler);
|
||||||
clients[routingId] = client;
|
// clients[routingId] = client;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
client.Enqueue(messageType, data);
|
// client.Enqueue(messageType, data);
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
foreach (var client in clients.Values) {
|
// foreach (var client in clients.Values) {
|
||||||
client.Connection.Close();
|
// client.Connection.Close();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
return Task.CompletedTask;
|
// return Task.CompletedTask;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) {
|
// private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) {
|
||||||
RuntimeLogger.Warning("Received unknown message ({Bytes} B) from {RoutingId}.", data.Length, routingId);
|
// RuntimeLogger.Warning("Received unknown message ({Bytes} B) from {RoutingId}.", data.Length, routingId);
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
private protected override Task Disconnect(ServerSocket socket) {
|
// private protected override Task Disconnect(ServerSocket socket) {
|
||||||
return Task.CompletedTask;
|
// return Task.CompletedTask;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
private sealed class Client {
|
// private sealed class Client {
|
||||||
public RpcConnectionToClient<TClientMessage> Connection { get; }
|
// public RpcConnectionToClient<TClientMessage> Connection { get; }
|
||||||
|
//
|
||||||
private readonly ILogger logger;
|
// private readonly ILogger logger;
|
||||||
private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor;
|
// private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor;
|
||||||
|
//
|
||||||
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) {
|
// public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) {
|
||||||
this.Connection = connection;
|
// this.Connection = connection;
|
||||||
this.Connection.Closed += OnConnectionClosed;
|
// this.Connection.Closed += OnConnectionClosed;
|
||||||
|
//
|
||||||
this.logger = PhantomLogger.Create(loggerName);
|
// this.logger = PhantomLogger.Create(loggerName);
|
||||||
|
//
|
||||||
var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection);
|
// var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection);
|
||||||
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver");
|
// this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver");
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
|
// internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
|
||||||
LogMessageType(messageType, data);
|
// LogMessageType(messageType, data);
|
||||||
receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data));
|
// receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data));
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
|
// private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
|
||||||
if (logger.IsEnabled(LogEventLevel.Verbose)) {
|
// if (logger.IsEnabled(LogEventLevel.Verbose)) {
|
||||||
logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
|
// logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
// private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
||||||
Connection.Closed -= OnConnectionClosed;
|
// Connection.Closed -= OnConnectionClosed;
|
||||||
|
//
|
||||||
logger.Debug("Closing connection...");
|
// logger.Debug("Closing connection...");
|
||||||
receiverActor.Stop();
|
// receiverActor.Stop();
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
@@ -74,10 +74,6 @@ sealed class Base24 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public byte[] Decode(ReadOnlySpan<char> data) {
|
public byte[] Decode(ReadOnlySpan<char> data) {
|
||||||
if (data == null) {
|
|
||||||
throw new ArgumentNullException(nameof(data));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data.Length % 7 != 0) {
|
if (data.Length % 7 != 0) {
|
||||||
throw new ArgumentException("The data length must be multiple of 7 chars.");
|
throw new ArgumentException("The data length must be multiple of 7 chars.");
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user