1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-09-17 12:24:49 +02:00

2 Commits

Author SHA1 Message Date
a8fe48ec77 Replace NetMQ with custom TCP logic 2025-09-12 01:15:20 +02:00
62d10f248d Replace NetMQ with custom TCP logic 2025-09-11 20:20:16 +02:00
12 changed files with 203 additions and 227 deletions

View File

@@ -4,7 +4,6 @@ using Phantom.Agent.Minecraft.Java;
using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Instances;
using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent.ToAgent;
@@ -53,24 +52,15 @@ public sealed class AgentServices {
public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) {
Logger.Information("Registering with the controller...");
Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError> registrationResult;
ImmutableArray<ConfigureInstanceMessage> configureInstanceMessages;
try {
registrationResult = await connection.Send<RegisterAgentMessage, Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
configureInstanceMessages = await connection.Send<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
} catch (Exception e) {
Logger.Fatal(e, "Registration failed.");
return false;
}
if (!registrationResult) {
Logger.Fatal("Registration failed: {Error}", registrationResult.Error switch {
RegisterAgentError.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
_ => "Unknown error " + (byte) registrationResult.Error + ".",
});
return false;
}
foreach (var configureInstanceMessage in registrationResult.Value) {
foreach (var configureInstanceMessage in configureInstanceMessages) {
var configureInstanceCommand = new InstanceManagerActor.ConfigureInstanceCommand(
configureInstanceMessage.InstanceGuid,
configureInstanceMessage.Configuration,

View File

@@ -8,7 +8,7 @@ public readonly record struct ConnectionKey(RpcCertificateThumbprint Certificate
public byte[] ToBytes() {
Span<byte> result = stackalloc byte[TokenLength + CertificateThumbprint.Bytes.Length];
AuthToken.WriteTo(result[..TokenLength]);
AuthToken.Bytes.CopyTo(result[..TokenLength]);
CertificateThumbprint.Bytes.CopyTo(result[TokenLength..]);
return result.ToArray();
}

View File

@@ -1,5 +0,0 @@
namespace Phantom.Common.Data.Replies;
public enum RegisterAgentError : byte {
ConnectionAlreadyHasAnAgent = 0,
}

View File

@@ -1,4 +1,5 @@
using Phantom.Common.Data;
using System.Collections.Immutable;
using Phantom.Common.Data;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController;
@@ -19,7 +20,7 @@ public static class AgentMessageRegistries {
ToAgent.Add<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(4);
ToAgent.Add<SendCommandToInstanceMessage, Result<SendCommandToInstanceResult, InstanceActionFailure>>(5);
ToController.Add<RegisterAgentMessage>(0);
ToController.Add<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(0);
ToController.Add<UnregisterAgentMessage>(1);
ToController.Add<AgentIsAliveMessage>(2);
ToController.Add<AdvertiseJavaRuntimesMessage>(3);

View File

@@ -1,9 +0,0 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentFailureMessage(
[property: MemoryPackOrder(0)] RegisterAgentError ErrorKind
) : IMessageToAgent;

View File

@@ -1,9 +0,0 @@
using System.Collections.Immutable;
using MemoryPack;
namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentSuccessMessage(
[property: MemoryPackOrder(0)] ImmutableArray<ConfigureInstanceMessage> InitialInstanceConfigurations
) : IMessageToAgent;

View File

@@ -1,8 +1,6 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Utils.Actor;
@@ -11,4 +9,4 @@ namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentMessage(
[property: MemoryPackOrder(0)] AgentInfo AgentInfo
) : IMessageToController, ICanReply<Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>;
) : IMessageToController, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;

View File

@@ -87,7 +87,6 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
ReceiveAndReply<GetAgentJavaRuntimesMessage, ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>(HandleGetAgentJavaRuntimes);
ReceiveAndReplyLater<GetAuditLogMessage, Result<ImmutableArray<AuditLogItem>, UserActionFailure>>(HandleGetAuditLog);
ReceiveAndReplyLater<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(HandleGetEventLog);
Receive<ReplyMessage>(HandleReply);
}
private async Task HandleRegisterWeb(RegisterWebMessage message) {
@@ -189,8 +188,4 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
private Task<Result<ImmutableArray<EventLogItem>, UserActionFailure>> HandleGetEventLog(GetEventLogMessage message) {
return eventLogManager.GetMostRecentItems(userLoginManager.GetLoggedInUser(message.AuthToken), message.Count);
}
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
}

View File

@@ -71,41 +71,16 @@ internal sealed class RpcClientConnector {
throw;
}
SslStream? stream = null;
bool handledException = false;
SslStream? stream;
try {
stream = new SslStream(new NetworkStream(clientSocket, ownsSocket: false), leaveInnerStreamOpen: false);
try {
loggedCertificateValidationError = false;
await stream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
} catch (AuthenticationException e) {
if (!loggedCertificateValidationError) {
logger.Error(e, "Could not establish a secure connection.");
}
handledException = true;
throw;
}
logger.Information("Established a secure connection.");
try {
await PerformApplicationHandshake(stream, cancellationToken);
} catch (EndOfStreamException) {
logger.Warning("Could not perform application handshake, connection lost.");
handledException = true;
throw;
} catch (Exception e) {
logger.Warning(e, "Could not perform application handshake.");
handledException = true;
throw;
}
if (await FinalizeConnection(stream, cancellationToken)) {
return new Connection(clientSocket, stream);
}
} catch (Exception e) {
if (!handledException) {
logger.Error(e, "Caught unhandled exception.");
stream = null;
}
try {
@@ -116,6 +91,51 @@ internal sealed class RpcClientConnector {
return null;
}
private async Task<bool> FinalizeConnection(SslStream stream, CancellationToken cancellationToken) {
try {
loggedCertificateValidationError = false;
await stream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
} catch (AuthenticationException e) {
if (!loggedCertificateValidationError) {
logger.Error(e, "Could not establish a secure connection.");
}
return false;
}
logger.Information("Established a secure connection.");
try {
if (!await PerformApplicationHandshake(stream, cancellationToken)) {
return false;
}
} catch (EndOfStreamException) {
logger.Warning("Could not perform application handshake, connection lost.");
} catch (Exception e) {
logger.Warning(e, "Could not perform application handshake.");
}
return true;
}
private async Task<bool> PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
await Serialization.WriteAuthToken(parameters.AuthToken, stream, cancellationToken);
await Serialization.WriteGuid(sessionId, stream, cancellationToken);
var result = (RpcHandshakeResult) await Serialization.ReadByte(stream, cancellationToken);
switch (result) {
case RpcHandshakeResult.Success:
return true;
case RpcHandshakeResult.InvalidAuthToken:
logger.Error("Server rejected authorization token.");
return false;
default:
logger.Error("Server rejected client due to unknown error: {ErrorId}", result);
return false;
}
}
private bool ValidateServerCertificate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) {
@@ -142,12 +162,6 @@ internal sealed class RpcClientConnector {
return false;
}
private async Task PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
await Serialization.WriteAuthToken(parameters.AuthToken, stream, cancellationToken);
await Serialization.WriteGuid(sessionId, stream, cancellationToken);
// TODO read response
}
private static async Task DisconnectSocket(Socket socket, Stream? stream) {
if (stream != null) {
await stream.DisposeAsync();

View File

@@ -0,0 +1,5 @@
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcServerClient {
}

View File

@@ -1,135 +1,135 @@
using System.Collections.Concurrent;
using Akka.Actor;
using NetMQ.Sockets;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime2;
public static class RpcServerRuntime {
public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
RpcConfiguration config,
IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions,
IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler,
IActorRefFactory actorSystem,
CancellationToken cancellationToken
) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken);
}
}
internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config);
return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch();
}
private readonly string serviceName;
private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
private readonly IActorRefFactory actorSystem;
private readonly CancellationToken cancellationToken;
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) {
this.serviceName = socket.Config.ServiceName;
this.messageDefinitions = messageDefinitions;
this.registrationHandler = registrationHandler;
this.actorSystem = actorSystem;
this.cancellationToken = cancellationToken;
}
private protected override Task Run(ServerSocket socket) {
var clients = new ConcurrentDictionary<ulong, Client>();
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
if (clients.Remove(e.RoutingId, out var client)) {
client.Connection.Closed -= OnConnectionClosed;
}
}
while (!cancellationToken.IsCancellationRequested) {
var (routingId, data) = socket.Receive(cancellationToken);
if (data.Length == 0) {
LogUnknownMessage(routingId, data);
continue;
}
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
if (messageType == null) {
LogUnknownMessage(routingId, data);
continue;
}
if (!clients.TryGetValue(routingId, out var client)) {
if (messageType != typeof(TRegistrationMessage)) {
RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId);
continue;
}
var clientLoggerName = LoggerName + ":" + routingId;
var clientActorName = "Rpc-" + serviceName + "-" + routingId;
// TODO add pings and tear down connection after too much inactivity
var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler);
clients[routingId] = client;
}
client.Enqueue(messageType, data);
}
foreach (var client in clients.Values) {
client.Connection.Close();
}
return Task.CompletedTask;
}
private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) {
RuntimeLogger.Warning("Received unknown message ({Bytes} B) from {RoutingId}.", data.Length, routingId);
}
private protected override Task Disconnect(ServerSocket socket) {
return Task.CompletedTask;
}
private sealed class Client {
public RpcConnectionToClient<TClientMessage> Connection { get; }
private readonly ILogger logger;
private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor;
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) {
this.Connection = connection;
this.Connection.Closed += OnConnectionClosed;
this.logger = PhantomLogger.Create(loggerName);
var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection);
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver");
}
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data));
}
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
if (logger.IsEnabled(LogEventLevel.Verbose)) {
logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
}
}
private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
Connection.Closed -= OnConnectionClosed;
logger.Debug("Closing connection...");
receiverActor.Stop();
}
}
}
// using System.Collections.Concurrent;
// using Akka.Actor;
// using NetMQ.Sockets;
// using Phantom.Utils.Actor;
// using Phantom.Utils.Logging;
// using Serilog;
// using Serilog.Events;
//
// namespace Phantom.Utils.Rpc.Runtime2;
//
// public static class RpcServerRuntime {
// public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
// RpcConfiguration config,
// IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions,
// IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler,
// IActorRefFactory actorSystem,
// CancellationToken cancellationToken
// ) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
// return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken);
// }
// }
//
// internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
// internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) {
// var socket = RpcServerSocket.Connect(config);
// return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch();
// }
//
// private readonly string serviceName;
// private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
// private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
// private readonly IActorRefFactory actorSystem;
// private readonly CancellationToken cancellationToken;
//
// private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) {
// this.serviceName = socket.Config.ServiceName;
// this.messageDefinitions = messageDefinitions;
// this.registrationHandler = registrationHandler;
// this.actorSystem = actorSystem;
// this.cancellationToken = cancellationToken;
// }
//
// private protected override Task Run(ServerSocket socket) {
// var clients = new ConcurrentDictionary<ulong, Client>();
//
// void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
// if (clients.Remove(e.RoutingId, out var client)) {
// client.Connection.Closed -= OnConnectionClosed;
// }
// }
//
// while (!cancellationToken.IsCancellationRequested) {
// var (routingId, data) = socket.Receive(cancellationToken);
//
// if (data.Length == 0) {
// LogUnknownMessage(routingId, data);
// continue;
// }
//
// Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
// if (messageType == null) {
// LogUnknownMessage(routingId, data);
// continue;
// }
//
// if (!clients.TryGetValue(routingId, out var client)) {
// if (messageType != typeof(TRegistrationMessage)) {
// RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId);
// continue;
// }
//
// var clientLoggerName = LoggerName + ":" + routingId;
// var clientActorName = "Rpc-" + serviceName + "-" + routingId;
//
// // TODO add pings and tear down connection after too much inactivity
// var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker);
// connection.Closed += OnConnectionClosed;
//
// client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler);
// clients[routingId] = client;
// }
//
// client.Enqueue(messageType, data);
// }
//
// foreach (var client in clients.Values) {
// client.Connection.Close();
// }
//
// return Task.CompletedTask;
// }
//
// private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) {
// RuntimeLogger.Warning("Received unknown message ({Bytes} B) from {RoutingId}.", data.Length, routingId);
// }
//
// private protected override Task Disconnect(ServerSocket socket) {
// return Task.CompletedTask;
// }
//
// private sealed class Client {
// public RpcConnectionToClient<TClientMessage> Connection { get; }
//
// private readonly ILogger logger;
// private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor;
//
// public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) {
// this.Connection = connection;
// this.Connection.Closed += OnConnectionClosed;
//
// this.logger = PhantomLogger.Create(loggerName);
//
// var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection);
// this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver");
// }
//
// internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
// LogMessageType(messageType, data);
// receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data));
// }
//
// private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
// if (logger.IsEnabled(LogEventLevel.Verbose)) {
// logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
// }
// }
//
// private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
// Connection.Closed -= OnConnectionClosed;
//
// logger.Debug("Closing connection...");
// receiverActor.Stop();
// }
// }
// }

View File

@@ -74,10 +74,6 @@ sealed class Base24 {
}
public byte[] Decode(ReadOnlySpan<char> data) {
if (data == null) {
throw new ArgumentNullException(nameof(data));
}
if (data.Length % 7 != 0) {
throw new ArgumentException("The data length must be multiple of 7 chars.");
}