1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-09-17 12:24:49 +02:00

2 Commits

Author SHA1 Message Date
3f8e46ae2d Replace NetMQ with custom TCP logic 2025-09-11 19:44:43 +02:00
584e8acfd0 Replace NetMQ with custom TCP logic 2025-09-11 10:07:57 +02:00
39 changed files with 207 additions and 211 deletions

View File

@@ -1,9 +1,14 @@
using Akka.Actor;
using System.Collections.Immutable;
using Akka.Actor;
using Phantom.Agent.Minecraft.Java;
using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Instances;
using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
@@ -15,6 +20,7 @@ public sealed class AgentServices {
public ActorSystem ActorSystem { get; }
private AgentInfo AgentInfo { get; }
private AgentFolders AgentFolders { get; }
private AgentState AgentState { get; }
private BackupManager BackupManager { get; }
@@ -26,6 +32,7 @@ public sealed class AgentServices {
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration, ControllerConnection controllerConnection) {
this.ActorSystem = ActorSystemFactory.Create("Agent");
this.AgentInfo = agentInfo;
this.AgentFolders = agentFolders;
this.AgentState = new AgentState();
this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks);
@@ -43,6 +50,48 @@ public sealed class AgentServices {
}
}
public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) {
Logger.Information("Registering with the controller...");
Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError> registrationResult;
try {
registrationResult = await connection.Send<RegisterAgentMessage, Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
} catch (Exception e) {
Logger.Fatal(e, "Registration failed.");
return false;
}
if (!registrationResult) {
Logger.Fatal("Registration failed: {Error}", registrationResult.Error switch {
RegisterAgentError.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
_ => "Unknown error " + (byte) registrationResult.Error + ".",
});
return false;
}
foreach (var configureInstanceMessage in registrationResult.Value) {
var configureInstanceCommand = new InstanceManagerActor.ConfigureInstanceCommand(
configureInstanceMessage.InstanceGuid,
configureInstanceMessage.Configuration,
configureInstanceMessage.LaunchProperties,
configureInstanceMessage.LaunchNow,
AlwaysReportStatus: true
);
var configureInstanceResult = await InstanceManager.Request(configureInstanceCommand, cancellationToken);
if (!configureInstanceResult.Is(ConfigureInstanceResult.Success)) {
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", configureInstanceMessage.Configuration.InstanceName, configureInstanceMessage.InstanceGuid);
return false;
}
}
await connection.Send(new AdvertiseJavaRuntimesMessage(JavaRuntimeRepository.All), cancellationToken);
InstanceTicketManager.RefreshAgentStatus();
return true;
}
public async Task Shutdown() {
Logger.Information("Stopping services...");

View File

@@ -1,15 +1,20 @@
using Phantom.Common.Messages.Agent;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Agent.Services.Rpc;
public sealed class ControllerConnection(RpcSendChannel<IMessageToController> sendChannel) {
public ValueTask Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return sendChannel.SendMessage(message, CancellationToken.None /* TODO */);
public ValueTask Send<TMessage>(TMessage message, CancellationToken cancellationToken) where TMessage : IMessageToController {
return sendChannel.SendMessage(message, cancellationToken);
}
// TODO handle properly
public bool TrySend<TMessage>(TMessage message) where TMessage : IMessageToController {
return sendChannel.TrySendMessage(message);
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToController, ICanReply<TReply> {
return sendChannel.SendMessage<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
}
}

View File

@@ -1,82 +1,32 @@
using Phantom.Agent.Services.Instances;
using Phantom.Common.Data;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
namespace Phantom.Agent.Services.Rpc;
public sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToAgent> {
private static ILogger Logger { get; } = PhantomLogger.Create<ControllerMessageHandlerActor>();
public readonly record struct Init(RpcSendChannel<IMessageToController> SendChannel, AgentServices Agent, CancellationTokenSource ShutdownTokenSource);
public readonly record struct Init(AgentServices Agent);
public static Props<IMessageToAgent> Factory(Init init) {
return Props<IMessageToAgent>.Create(() => new ControllerMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly RpcSendChannel<IMessageToController> sendChannel;
private readonly AgentServices agent;
private readonly CancellationTokenSource shutdownTokenSource;
private ControllerMessageHandlerActor(Init init) {
this.sendChannel = init.SendChannel;
this.agent = init.Agent;
this.shutdownTokenSource = init.ShutdownTokenSource;
ReceiveAsync<RegisterAgentSuccessMessage>(HandleRegisterAgentSuccess);
Receive<RegisterAgentFailureMessage>(HandleRegisterAgentFailure);
ReceiveAndReplyLater<ConfigureInstanceMessage, Result<ConfigureInstanceResult, InstanceActionFailure>>(HandleConfigureInstance);
ReceiveAndReplyLater<LaunchInstanceMessage, Result<LaunchInstanceResult, InstanceActionFailure>>(HandleLaunchInstance);
ReceiveAndReplyLater<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(HandleStopInstance);
ReceiveAndReplyLater<SendCommandToInstanceMessage, Result<SendCommandToInstanceResult, InstanceActionFailure>>(HandleSendCommandToInstance);
}
private async Task HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
Logger.Information("Agent authentication successful.");
void ShutdownAfterConfigurationFailed(Guid instanceGuid, InstanceConfiguration configuration) {
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", configuration.InstanceName, instanceGuid);
shutdownTokenSource.Cancel();
}
foreach (var configureInstanceMessage in message.InitialInstanceConfigurations) {
var result = await HandleConfigureInstance(configureInstanceMessage, alwaysReportStatus: true);
if (!result.Is(ConfigureInstanceResult.Success)) {
ShutdownAfterConfigurationFailed(configureInstanceMessage.InstanceGuid, configureInstanceMessage.Configuration);
return;
}
}
await sendChannel.SendMessage(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All), CancellationToken.None);
agent.InstanceTicketManager.RefreshAgentStatus();
}
private void HandleRegisterAgentFailure(RegisterAgentFailureMessage message) {
string errorMessage = message.FailureKind switch {
RegisterAgentFailure.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
RegisterAgentFailure.InvalidToken => "Invalid token.",
_ => "Unknown error " + (byte) message.FailureKind + "."
};
Logger.Fatal("Agent authentication failed: {Error}", errorMessage);
PhantomLogger.Dispose();
Environment.Exit(1);
}
private Task<Result<ConfigureInstanceResult, InstanceActionFailure>> HandleConfigureInstance(ConfigureInstanceMessage message, bool alwaysReportStatus) {
return agent.InstanceManager.Request(new InstanceManagerActor.ConfigureInstanceCommand(message.InstanceGuid, message.Configuration, message.LaunchProperties, message.LaunchNow, alwaysReportStatus));
}
private async Task<Result<ConfigureInstanceResult, InstanceActionFailure>> HandleConfigureInstance(ConfigureInstanceMessage message) {
return await HandleConfigureInstance(message, alwaysReportStatus: false);
return await agent.InstanceManager.Request(new InstanceManagerActor.ConfigureInstanceCommand(message.InstanceGuid, message.Configuration, message.LaunchProperties, message.LaunchNow, AlwaysReportStatus: false));
}
private async Task<Result<LaunchInstanceResult, InstanceActionFailure>> HandleLaunchInstance(LaunchInstanceMessage message) {

View File

@@ -1,7 +0,0 @@
using Phantom.Common.Data;
namespace Phantom.Agent.Services.Rpc;
sealed class RpcClientAgentHandshake(AuthToken authToken) {
}

View File

@@ -46,41 +46,45 @@ try {
return 1;
}
var (certificateThumbprint, authToken) = agentKey.Value;
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
var rpcClientConnectionParameters = new RpcClientConnectionParameters(
Host: controllerHost,
Port: controllerPort,
DistinguishedName: "phantom-controller",
CertificateThumbprint: certificateThumbprint,
CertificateThumbprint: agentKey.Value.CertificateThumbprint,
AuthToken: agentKey.Value.AuthToken,
SendQueueCapacity: 500,
PingInterval: TimeSpan.FromSeconds(10)
);
using var rpcClient = await RpcClient<IMessageToController, IMessageToAgent>.Connect("Controller", rpcClientConnectionParameters, null, AgentMessageRegistries.Definitions, shutdownCancellationToken);
using var rpcClient = await RpcClient<IMessageToController, IMessageToAgent>.Connect("Controller", rpcClientConnectionParameters, AgentMessageRegistries.Definitions, shutdownCancellationToken);
if (rpcClient == null) {
return 1;
}
var controllerConnection = new ControllerConnection(rpcClient.SendChannel);
Task? rpcClientListener = null;
try {
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcClient.SendChannel));
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), controllerConnection);
await agentServices.Initialize();
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(rpcClient.SendChannel, agentServices, shutdownCancellationTokenSource);
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(agentServices);
var rpcMessageHandlerActor = agentServices.ActorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
PhantomLogger.Root.Information("Phantom Panel agent is ready.");
rpcClientListener = rpcClient.Listen(rpcMessageHandlerActor);
if (await agentServices.Register(controllerConnection, shutdownCancellationToken)) {
PhantomLogger.Root.Information("Phantom Panel agent is ready.");
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
}
await agentServices.Shutdown();
} finally {
try {
await rpcClient.SendChannel.SendMessage(new UnregisterAgentMessage(), CancellationToken.None);
await controllerConnection.Send(new UnregisterAgentMessage(), CancellationToken.None);
// TODO wait for acknowledgment
} catch (Exception e) {
PhantomLogger.Root.Warning(e, "Could not unregister agent after shutdown.");

View File

@@ -1,4 +1,5 @@
using Phantom.Utils.Rpc.Runtime.Tls;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Runtime.Tls;
namespace Phantom.Common.Data;

View File

@@ -0,0 +1,5 @@
namespace Phantom.Common.Data.Replies;
public enum RegisterAgentError : byte {
ConnectionAlreadyHasAnAgent = 0,
}

View File

@@ -1,6 +0,0 @@
namespace Phantom.Common.Data.Replies;
public enum RegisterAgentFailure : byte {
ConnectionAlreadyHasAnAgent,
InvalidToken
}

View File

@@ -1,5 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using MemoryPack;
using Phantom.Utils.Monads;
using Phantom.Utils.Result;
namespace Phantom.Common.Data;
@@ -24,6 +25,9 @@ public sealed partial class Result<TValue, TError> {
[MemoryPackIgnore]
public TError Error => !hasValue ? error! : throw new InvalidOperationException("Attempted to get error from a success result.");
[MemoryPackIgnore]
public Either<TValue, TError> AsEither => hasValue ? Either.Left(value!) : Either.Right(error!);
private Result(bool hasValue, TValue? value, TError? error) {
this.hasValue = hasValue;
this.value = value;

View File

@@ -14,8 +14,6 @@ public static class AgentMessageRegistries {
public static IMessageDefinitions<IMessageToController, IMessageToAgent> Definitions { get; } = new MessageDefinitions();
static AgentMessageRegistries() {
ToAgent.Add<RegisterAgentSuccessMessage>(0);
ToAgent.Add<RegisterAgentFailureMessage>(1);
ToAgent.Add<ConfigureInstanceMessage, Result<ConfigureInstanceResult, InstanceActionFailure>>(2);
ToAgent.Add<LaunchInstanceMessage, Result<LaunchInstanceResult, InstanceActionFailure>>(3);
ToAgent.Add<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(4);

View File

@@ -5,5 +5,5 @@ namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentFailureMessage(
[property: MemoryPackOrder(0)] RegisterAgentFailure FailureKind
[property: MemoryPackOrder(0)] RegisterAgentError ErrorKind
) : IMessageToAgent;

View File

@@ -1,11 +1,14 @@
using MemoryPack;
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Utils.Actor;
namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentMessage(
[property: MemoryPackOrder(0)] AuthToken AuthToken,
[property: MemoryPackOrder(1)] AgentInfo AgentInfo
) : IMessageToController;
[property: MemoryPackOrder(0)] AgentInfo AgentInfo
) : IMessageToController, ICanReply<Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>;

View File

@@ -1,9 +0,0 @@
using MemoryPack;
using Phantom.Common.Data;
namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterWebMessage(
[property: MemoryPackOrder(0)] AuthToken AuthToken
) : IMessageToController;

View File

@@ -21,7 +21,6 @@ public static class WebMessageRegistries {
public static IMessageDefinitions<IMessageToController, IMessageToWeb> Definitions { get; } = new MessageDefinitions();
static WebMessageRegistries() {
ToController.Add<RegisterWebMessage>(0);
ToController.Add<UnregisterWebMessage>(1);
ToController.Add<LogInMessage, Optional<LogInSuccess>>(2);
ToController.Add<LogOutMessage>(3);
@@ -42,7 +41,6 @@ public static class WebMessageRegistries {
ToController.Add<GetAuditLogMessage, Result<ImmutableArray<AuditLogItem>, UserActionFailure>>(18);
ToController.Add<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(19);
ToWeb.Add<RegisterWebResultMessage>(0);
ToWeb.Add<RefreshAgentsMessage>(1);
ToWeb.Add<RefreshInstancesMessage>(2);
ToWeb.Add<InstanceOutputMessage>(3);

View File

@@ -13,6 +13,7 @@ using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Users.Sessions;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc;
using Serilog;
namespace Phantom.Controller.Services.Agents;
@@ -64,7 +65,7 @@ sealed class AgentManager {
public async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, RpcConnectionToClient<IMessageToAgent> connection) {
if (!this.authToken.FixedTimeEquals(authToken)) {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.InvalidToken));
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentError.InvalidToken));
return false;
}

View File

@@ -1,5 +1,4 @@
using Akka.Actor;
using Phantom.Common.Data;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Common.Messages.Web;
@@ -13,7 +12,7 @@ using Phantom.Controller.Services.Rpc;
using Phantom.Controller.Services.Users;
using Phantom.Controller.Services.Users.Sessions;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime2;
using Phantom.Utils.Rpc;
using IMessageFromAgentToController = Phantom.Common.Messages.Agent.IMessageToController;
using IMessageFromWebToController = Phantom.Common.Messages.Web.IMessageToController;

View File

@@ -40,12 +40,11 @@ sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
Receive<ReportInstancePlayerCountsMessage>(HandleReportInstancePlayerCounts);
Receive<ReportInstanceEventMessage>(HandleReportInstanceEvent);
Receive<InstanceOutputMessage>(HandleInstanceOutput);
Receive<ReplyMessage>(HandleReply);
}
private async Task HandleRegisterAgent(RegisterAgentMessage message) {
if (agentGuid != message.AgentInfo.AgentGuid) {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentError.ConnectionAlreadyHasAnAgent));
}
else {
await agentRegistrationHandler.TryRegisterImpl(connection, message);
@@ -84,8 +83,4 @@ sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
private void HandleInstanceOutput(InstanceOutputMessage message) {
instanceLogManager.ReceiveLines(message.InstanceGuid, message.Lines);
}
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
}

View File

@@ -1,34 +0,0 @@
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Messages.Agent.Handshake;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
namespace Phantom.Controller.Services.Rpc;
public sealed class RpcServerAgentHandshake(AuthToken authToken) : RpcServerHandshake {
private static readonly ILogger Logger = PhantomLogger.Create<RpcServerAgentHandshake>();
protected override async Task<bool> AcceptClient(string remoteAddress, Stream stream, CancellationToken cancellationToken) {
Memory<byte> buffer = new Memory<byte>(new byte[AuthToken.Length]);
await stream.ReadExactlyAsync(buffer, cancellationToken: cancellationToken);
if (!authToken.FixedTimeEquals(buffer.Span)) {
Logger.Warning("Rejected client {}, invalid authorization token.", remoteAddress);
await Respond(remoteAddress, stream, new InvalidAuthToken(), cancellationToken);
return false;
}
AgentInfo agentInfo = await Serialization.Deserialize<AgentInfo>(stream, cancellationToken);
return true;
}
private async ValueTask Respond(string remoteAddress, Stream stream, IAgentHandshakeResult result, CancellationToken cancellationToken) {
try {
await Serialization.Serialize(result, stream, cancellationToken);
} catch (Exception e) {
Logger.Error(e, "Could not send handshake result to client {}.", remoteAddress);
}
}
}

View File

@@ -1,5 +1,4 @@
using Phantom.Common.Data;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Controller.Minecraft;
@@ -10,7 +9,7 @@ using Phantom.Controller.Services.Users;
using Phantom.Controller.Services.Users.Sessions;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime2;
using Phantom.Utils.Rpc;
using Serilog;
namespace Phantom.Controller.Services.Rpc;

View File

@@ -1,5 +1,5 @@
using Phantom.Common.Data;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Runtime.Tls;
namespace Phantom.Controller;

View File

@@ -3,7 +3,7 @@ using Phantom.Utils.Cryptography;
using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Phantom.Utils.Monads;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Runtime.Tls;
using Serilog;

View File

@@ -2,7 +2,6 @@
using Phantom.Controller;
using Phantom.Controller.Database.Postgres;
using Phantom.Controller.Services;
using Phantom.Controller.Services.Rpc;
using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
@@ -58,8 +57,8 @@ try {
await controllerServices.Initialize();
LinkedTasks<bool> rpcServerTasks = new LinkedTasks<bool>([
new RpcServer("Agent", agentRpcServerHost, agentKeyData.Certificate, new RpcServerAgentHandshake(agentKeyData.AuthToken)).Run(shutdownCancellationToken),
// new RpcServer("Web", webRpcServerHost, webKeyData.Certificate).Run(shutdownCancellationToken),
new RpcServer("Agent", agentRpcServerHost, agentKeyData.AuthToken, agentKeyData.Certificate).Run(shutdownCancellationToken),
new RpcServer("Web", webRpcServerHost, agentKeyData.AuthToken, webKeyData.Certificate).Run(shutdownCancellationToken),
]);
// If either RPC server crashes, stop the whole process.

View File

@@ -1,18 +1,12 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using System.Security.Cryptography;
using MemoryPack;
namespace Phantom.Common.Data;
namespace Phantom.Utils.Rpc;
[MemoryPackable(GenerateType.VersionTolerant)]
[SuppressMessage("ReSharper", "MemberCanBePrivate.Global")]
public sealed partial class AuthToken {
public sealed class AuthToken {
public const int Length = 12;
[MemoryPackOrder(0)]
[MemoryPackInclude]
public readonly ImmutableArray<byte> Bytes;
public ImmutableArray<byte> Bytes { get; }
public AuthToken(ImmutableArray<byte> bytes) {
if (bytes.Length != Length) {
@@ -30,10 +24,6 @@ public sealed partial class AuthToken {
return CryptographicOperations.FixedTimeEquals(Bytes.AsSpan(), other);
}
internal void WriteTo(Span<byte> span) {
Bytes.CopyTo(span);
}
public static AuthToken Generate() {
return new AuthToken([..RandomNumberGenerator.GetBytes(Length)]);
}

View File

@@ -29,7 +29,7 @@ sealed record MessageFrame(uint MessageId, ushort RegistryCode, ReadOnlyMemory<b
private static void CheckMessageLength(int messageLength) {
if (messageLength < 0) {
throw new RpcErrorException("Message length is negative", RpcError.InvalidData);
throw new RpcErrorException("Message length is negative.", RpcError.InvalidData);
}
if (messageLength > MaxMessageBytes) {

View File

@@ -27,7 +27,7 @@ sealed record ReplyFrame(uint ReplyingToMessageId, ReadOnlyMemory<byte> Serializ
private static void CheckReplyLength(int replyLength) {
if (replyLength < 0) {
throw new RpcErrorException("Reply length is negative", RpcError.InvalidData);
throw new RpcErrorException("Reply length is negative.", RpcError.InvalidData);
}
if (replyLength > MaxReplyBytes) {

View File

@@ -12,7 +12,7 @@ public sealed class MessageRegistry<TMessageBase>(ILogger logger) {
public void Add<TMessage>(ushort code) where TMessage : TMessageBase {
if (HasReplyType(typeof(TMessage))) {
throw new ArgumentException("This overload is for messages without a reply");
throw new ArgumentException("This overload is for messages without a reply.");
}
AddTypeCodeMapping<TMessage>(code);

View File

@@ -8,8 +8,8 @@ using Serilog;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> : IDisposable {
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, RpcClientConnectionParameters connectionParameters, RpcClientHandshake handshake, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) {
RpcClientConnector connector = new RpcClientConnector(loggerName, connectionParameters, handshake);
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, RpcClientConnectionParameters connectionParameters, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) {
RpcClientConnector connector = new RpcClientConnector(loggerName, connectionParameters);
RpcClientConnector.Connection? connection = await connector.EstablishNewConnection(cancellationToken);
return connection == null ? null : new RpcClient<TClientToServerMessage, TServerToClientMessage>(loggerName, connectionParameters, messageDefinitions, connector, connection);
}
@@ -85,8 +85,6 @@ public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> :
logger.Error(e, "Caught exception while closing connection.");
}
// TODO disconnection handshake?
logger.Information("Client shut down.");
}

View File

@@ -7,6 +7,7 @@ public readonly record struct RpcClientConnectionParameters(
ushort Port,
string DistinguishedName,
RpcCertificateThumbprint CertificateThumbprint,
AuthToken AuthToken,
ushort SendQueueCapacity,
TimeSpan PingInterval
);

View File

@@ -15,16 +15,16 @@ internal sealed class RpcClientConnector {
private static readonly TimeSpan DisconnectTimeout = TimeSpan.FromSeconds(10);
private readonly ILogger logger;
private readonly Guid sessionId;
private readonly RpcClientConnectionParameters parameters;
private readonly RpcClientHandshake handshake;
private readonly SslClientAuthenticationOptions sslOptions;
private bool loggedCertificateValidationError = false;
public RpcClientConnector(string loggerName, RpcClientConnectionParameters parameters, RpcClientHandshake handshake) {
public RpcClientConnector(string loggerName, RpcClientConnectionParameters parameters) {
this.logger = PhantomLogger.Create<RpcClientConnector>(loggerName);
this.sessionId = Guid.NewGuid();
this.parameters = parameters;
this.handshake = handshake;
this.sslOptions = new SslClientAuthenticationOptions {
AllowRenegotiation = false,
@@ -91,7 +91,7 @@ internal sealed class RpcClientConnector {
logger.Information("Established a secure connection.");
try {
await handshake.AcceptServer(stream, cancellationToken);
await PerformApplicationHandshake(stream, cancellationToken);
} catch (EndOfStreamException) {
logger.Warning("Could not perform application handshake, connection lost.");
handledException = true;
@@ -142,6 +142,12 @@ internal sealed class RpcClientConnector {
return false;
}
private async Task PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
await Serialization.WriteAuthToken(parameters.AuthToken, stream, cancellationToken);
await Serialization.WriteGuid(sessionId, stream, cancellationToken);
// TODO read response
}
private static async Task DisconnectSocket(Socket socket, Stream? stream) {
if (stream != null) {
await stream.DisposeAsync();

View File

@@ -1,5 +0,0 @@
namespace Phantom.Utils.Rpc.Runtime;
public abstract class RpcClientHandshake {
protected internal abstract Task<bool> AcceptServer(Stream stream, CancellationToken cancellationToken);
}

View File

@@ -1,6 +1,6 @@
namespace Phantom.Utils.Rpc.Runtime;
public enum RpcHandshakeResult : byte {
UnknownError = 0,
InvalidFormat = 1,
Success = 0,
InvalidAuthToken = 1,
}

View File

@@ -3,12 +3,13 @@ using System.Net.Security;
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using Phantom.Utils.Logging;
using Phantom.Utils.Monads;
using Phantom.Utils.Rpc.Runtime.Tls;
using Serilog;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCertificate certificate, RpcServerHandshake handshake) {
public sealed class RpcServer(string loggerName, EndPoint endPoint, AuthToken authToken, RpcServerCertificate certificate) {
private readonly ILogger logger = PhantomLogger.Create<RpcServer>(loggerName);
private readonly List<Client> clients = [];
@@ -39,7 +40,7 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
while (!shutdownToken.IsCancellationRequested) {
Socket clientSocket = await serverSocket.AcceptAsync(shutdownToken);
clients.Add(new Client(logger, clientSocket, sslOptions, handshake, shutdownToken));
clients.Add(new Client(logger, clientSocket, sslOptions, authToken, shutdownToken));
clients.RemoveAll(static client => client.Task.IsCompleted);
}
} finally {
@@ -77,17 +78,19 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
public Task Task { get; }
private string Address => socket.RemoteEndPoint?.ToString() ?? "<unknown address>";
private readonly ILogger logger;
private readonly Socket socket;
private readonly SslServerAuthenticationOptions sslOptions;
private readonly RpcServerHandshake handshake;
private readonly AuthToken authToken;
private readonly CancellationToken shutdownToken;
public Client(ILogger logger, Socket socket, SslServerAuthenticationOptions sslOptions, RpcServerHandshake handshake, CancellationToken shutdownToken) {
public Client(ILogger logger, Socket socket, SslServerAuthenticationOptions sslOptions, AuthToken authToken, CancellationToken shutdownToken) {
this.logger = logger;
this.socket = socket;
this.sslOptions = sslOptions;
this.handshake = handshake;
this.authToken = authToken;
this.shutdownToken = shutdownToken;
this.Task = Run();
}
@@ -98,13 +101,18 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
try {
await stream.AuthenticateAsServerAsync(sslOptions, shutdownToken);
} catch (OperationCanceledException) {
throw;
} catch (Exception e) {
logger.Error(e, "Could not establish a secure connection.");
return;
}
Either<Guid, RpcHandshakeResult> handshakeResult;
try {
await handshake.AcceptClient(socket.RemoteEndPoint?.ToString() ?? "<unknown address>", stream, shutdownToken);
handshakeResult = await PerformApplicationHandshake(stream, shutdownToken);
} catch (OperationCanceledException) {
throw;
} catch (EndOfStreamException) {
logger.Warning("Could not perform application handshake, connection lost.");
return;
@@ -113,9 +121,17 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
return;
}
byte[] buffer = new byte[1024];
int readBytes;
while ((readBytes = await stream.ReadAsync(buffer, shutdownToken)) > 0) {}
switch (handshakeResult) {
case Left<Guid, RpcHandshakeResult>(var sessionId):
await Serialization.WriteByte((byte) RpcHandshakeResult.Success, stream, shutdownToken);
break;
case Right<Guid, RpcHandshakeResult>(var error):
await Serialization.WriteByte((byte) error, stream, shutdownToken);
break;
}
} catch (OperationCanceledException) {
logger.Warning("Cancelling an incoming client due to shutdown.");
} finally {
try {
using var timeoutTokenSource = new CancellationTokenSource(DisconnectTimeout);
@@ -129,5 +145,16 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
}
}
}
private async Task<Either<Guid, RpcHandshakeResult>> PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
var clientAuthToken = await Serialization.ReadAuthToken(stream, cancellationToken);
if (!authToken.FixedTimeEquals(clientAuthToken)) {
logger.Warning("Rejected client {}, invalid authorization token.", Address);
return Either.Right(RpcHandshakeResult.InvalidAuthToken);
}
var sessionId = await Serialization.ReadGuid(stream, cancellationToken);
return Either.Left(sessionId);
}
}
}

View File

@@ -1,5 +0,0 @@
namespace Phantom.Utils.Rpc.Runtime;
public abstract class RpcServerHandshake {
protected internal abstract Task<bool> AcceptClient(string remoteAddress, Stream stream, CancellationToken cancellationToken);
}

View File

@@ -1,8 +1,7 @@
using System.Security.Cryptography.X509Certificates;
using Phantom.Utils.Monads;
using Phantom.Utils.Rpc.Runtime.Tls;
namespace Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc.Runtime.Tls;
public sealed class RpcServerCertificate {
public static byte[] CreateAndExport(string commonName) {

View File

@@ -5,50 +5,73 @@ using MemoryPack;
namespace Phantom.Utils.Rpc;
static class Serialization {
private const int GuidBytes = 16;
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
private static async ValueTask WritePrimitive<T>(T value, int size, Action<Span<byte>, T> writer, Stream stream, CancellationToken cancellationToken) {
private static async ValueTask WriteValue<T>(T value, int size, Action<Span<byte>, T> writer, Stream stream, CancellationToken cancellationToken) {
using var buffer = RentedMemory<byte>.Rent(size);
writer(buffer.AsSpan, value);
await stream.WriteAsync(buffer.AsMemory, cancellationToken);
}
private static async ValueTask<T> ReadPrimitive<T>(Func<ReadOnlySpan<byte>, T> reader, int size, Stream stream, CancellationToken cancellationToken) {
private static async ValueTask<T> ReadValue<T>(Func<ReadOnlySpan<byte>, T> reader, int size, Stream stream, CancellationToken cancellationToken) {
using var buffer = RentedMemory<byte>.Rent(size);
await stream.ReadExactlyAsync(buffer.AsMemory, cancellationToken);
return reader(buffer.AsSpan);
}
public static ValueTask WriteByte(byte value, Stream stream, CancellationToken cancellationToken) {
return WritePrimitive(value, sizeof(byte), static (span, value) => span[0] = value, stream, cancellationToken);
return WriteValue(value, sizeof(byte), static (span, value) => span[0] = value, stream, cancellationToken);
}
public static ValueTask<byte> ReadByte(Stream stream, CancellationToken cancellationToken) {
return ReadPrimitive(static span => span[0], sizeof(byte), stream, cancellationToken);
return ReadValue(static span => span[0], sizeof(byte), stream, cancellationToken);
}
public static ValueTask WriteUnsignedShort(ushort value, Stream stream, CancellationToken cancellationToken) {
return WritePrimitive(value, sizeof(ushort), BinaryPrimitives.WriteUInt16LittleEndian, stream, cancellationToken);
return WriteValue(value, sizeof(ushort), BinaryPrimitives.WriteUInt16LittleEndian, stream, cancellationToken);
}
public static ValueTask<ushort> ReadUnsignedShort(Stream stream, CancellationToken cancellationToken) {
return ReadPrimitive(BinaryPrimitives.ReadUInt16LittleEndian, sizeof(ushort), stream, cancellationToken);
return ReadValue(BinaryPrimitives.ReadUInt16LittleEndian, sizeof(ushort), stream, cancellationToken);
}
public static ValueTask WriteSignedInt(int value, Stream stream, CancellationToken cancellationToken) {
return WritePrimitive(value, sizeof(int), BinaryPrimitives.WriteInt32LittleEndian, stream, cancellationToken);
return WriteValue(value, sizeof(int), BinaryPrimitives.WriteInt32LittleEndian, stream, cancellationToken);
}
public static ValueTask<int> ReadSignedInt(Stream stream, CancellationToken cancellationToken) {
return ReadPrimitive(BinaryPrimitives.ReadInt32LittleEndian, sizeof(int), stream, cancellationToken);
return ReadValue(BinaryPrimitives.ReadInt32LittleEndian, sizeof(int), stream, cancellationToken);
}
public static ValueTask WriteUnsignedInt(uint value, Stream stream, CancellationToken cancellationToken) {
return WritePrimitive(value, sizeof(uint), BinaryPrimitives.WriteUInt32LittleEndian, stream, cancellationToken);
return WriteValue(value, sizeof(uint), BinaryPrimitives.WriteUInt32LittleEndian, stream, cancellationToken);
}
public static ValueTask<uint> ReadUnsignedInt(Stream stream, CancellationToken cancellationToken) {
return ReadPrimitive(BinaryPrimitives.ReadUInt32LittleEndian, sizeof(uint), stream, cancellationToken);
return ReadValue(BinaryPrimitives.ReadUInt32LittleEndian, sizeof(uint), stream, cancellationToken);
}
public static ValueTask WriteGuid(Guid guid, Stream stream, CancellationToken cancellationToken) {
static void Write(Span<byte> span, Guid guid) {
if (!guid.TryWriteBytes(span)) {
throw new ArgumentException("Span is not large enough to write a GUID.", nameof(span));
}
}
return WriteValue(guid, size: GuidBytes, Write, stream, cancellationToken);
}
public static ValueTask<Guid> ReadGuid(Stream stream, CancellationToken cancellationToken) {
return ReadValue(static span => new Guid(span), size: GuidBytes, stream, cancellationToken);
}
public static ValueTask WriteAuthToken(AuthToken authToken, Stream stream, CancellationToken cancellationToken) {
return stream.WriteAsync(authToken.Bytes.AsMemory(), cancellationToken);
}
public static ValueTask<AuthToken> ReadAuthToken(Stream stream, CancellationToken cancellationToken) {
return ReadValue(static span => new AuthToken([..span]), AuthToken.Length, stream, cancellationToken);
}
public static async ValueTask<ReadOnlyMemory<byte>> ReadBytes(int length, Stream stream, CancellationToken cancellationToken) {

View File

@@ -1,6 +1,9 @@
namespace Phantom.Utils.Monads;
public abstract record Either<TLeft, TRight> {
public abstract bool IsLeft { get; }
public abstract bool IsRight { get; }
public abstract TLeft RequireLeft { get; }
public abstract TRight RequireRight { get; }

View File

@@ -1,6 +1,9 @@
namespace Phantom.Utils.Monads;
public sealed record Left<TLeft, TRight>(TLeft Value) : Either<TLeft, TRight> {
public override bool IsLeft => true;
public override bool IsRight => false;
public override TLeft RequireLeft => Value;
public override TRight RequireRight => throw new InvalidOperationException("Either<" + typeof(TLeft).Name + ", " + typeof(TRight).Name + "> has a left value, but right value was requested.");

View File

@@ -1,6 +1,9 @@
namespace Phantom.Utils.Monads;
public sealed record Right<TLeft, TRight>(TRight Value) : Either<TLeft, TRight> {
public override bool IsLeft => false;
public override bool IsRight => true;
public override TLeft RequireLeft => throw new InvalidOperationException("Either<" + typeof(TLeft).Name + ", " + typeof(TRight).Name + "> has a right value, but left value was requested.");
public override TRight RequireRight => Value;

View File

@@ -48,8 +48,6 @@ try {
string dataProtectionKeysPath = Path.GetFullPath("./keys");
CreateFolderOrStop(dataProtectionKeysPath, Chmod.URWX);
var (certificateThumbprint, authToken) = webKey.Value;
var administratorToken = TokenGenerator.Create(60);
var applicationProperties = new ApplicationProperties(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken));
@@ -57,12 +55,13 @@ try {
Host: controllerHost,
Port: controllerPort,
DistinguishedName: "phantom-controller",
CertificateThumbprint: certificateThumbprint,
CertificateThumbprint: webKey.Value.CertificateThumbprint,
AuthToken: webKey.Value.AuthToken,
SendQueueCapacity: 500,
PingInterval: TimeSpan.FromSeconds(10)
);
using var rpcClient = await RpcClient<IMessageToController, IMessageToWeb>.Connect("Controller", rpcClientConnectionParameters, null, WebMessageRegistries.Definitions, shutdownCancellationToken);
using var rpcClient = await RpcClient<IMessageToController, IMessageToWeb>.Connect("Controller", rpcClientConnectionParameters, WebMessageRegistries.Definitions, shutdownCancellationToken);
if (rpcClient == null) {
return 1;
}
@@ -84,9 +83,9 @@ try {
PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", webConfiguration.HttpUrl, webConfiguration.BasePath + "setup");
await WebLauncher.Launch(webConfiguration, webApplication);
rpcClientListener = rpcClient.Listen(messageHandlerFactory.Create(actorSystem));
PhantomLogger.Root.Information("Phantom Panel web is ready.");
rpcClientListener = rpcClient.Listen(messageHandlerFactory.Create(actorSystem));
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
} finally {