mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-09-17 12:24:49 +02:00
Compare commits
2 Commits
47e563f7ce
...
3f8e46ae2d
Author | SHA1 | Date | |
---|---|---|---|
3f8e46ae2d
|
|||
584e8acfd0
|
@@ -1,9 +1,14 @@
|
||||
using Akka.Actor;
|
||||
using System.Collections.Immutable;
|
||||
using Akka.Actor;
|
||||
using Phantom.Agent.Minecraft.Java;
|
||||
using Phantom.Agent.Services.Backups;
|
||||
using Phantom.Agent.Services.Instances;
|
||||
using Phantom.Agent.Services.Rpc;
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Agent;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Serilog;
|
||||
@@ -15,6 +20,7 @@ public sealed class AgentServices {
|
||||
|
||||
public ActorSystem ActorSystem { get; }
|
||||
|
||||
private AgentInfo AgentInfo { get; }
|
||||
private AgentFolders AgentFolders { get; }
|
||||
private AgentState AgentState { get; }
|
||||
private BackupManager BackupManager { get; }
|
||||
@@ -26,6 +32,7 @@ public sealed class AgentServices {
|
||||
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration, ControllerConnection controllerConnection) {
|
||||
this.ActorSystem = ActorSystemFactory.Create("Agent");
|
||||
|
||||
this.AgentInfo = agentInfo;
|
||||
this.AgentFolders = agentFolders;
|
||||
this.AgentState = new AgentState();
|
||||
this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks);
|
||||
@@ -43,6 +50,48 @@ public sealed class AgentServices {
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) {
|
||||
Logger.Information("Registering with the controller...");
|
||||
|
||||
Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError> registrationResult;
|
||||
try {
|
||||
registrationResult = await connection.Send<RegisterAgentMessage, Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
|
||||
} catch (Exception e) {
|
||||
Logger.Fatal(e, "Registration failed.");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!registrationResult) {
|
||||
Logger.Fatal("Registration failed: {Error}", registrationResult.Error switch {
|
||||
RegisterAgentError.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
|
||||
_ => "Unknown error " + (byte) registrationResult.Error + ".",
|
||||
});
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach (var configureInstanceMessage in registrationResult.Value) {
|
||||
var configureInstanceCommand = new InstanceManagerActor.ConfigureInstanceCommand(
|
||||
configureInstanceMessage.InstanceGuid,
|
||||
configureInstanceMessage.Configuration,
|
||||
configureInstanceMessage.LaunchProperties,
|
||||
configureInstanceMessage.LaunchNow,
|
||||
AlwaysReportStatus: true
|
||||
);
|
||||
|
||||
var configureInstanceResult = await InstanceManager.Request(configureInstanceCommand, cancellationToken);
|
||||
if (!configureInstanceResult.Is(ConfigureInstanceResult.Success)) {
|
||||
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", configureInstanceMessage.Configuration.InstanceName, configureInstanceMessage.InstanceGuid);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
await connection.Send(new AdvertiseJavaRuntimesMessage(JavaRuntimeRepository.All), cancellationToken);
|
||||
InstanceTicketManager.RefreshAgentStatus();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public async Task Shutdown() {
|
||||
Logger.Information("Stopping services...");
|
||||
|
||||
|
@@ -1,15 +1,20 @@
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Agent.Services.Rpc;
|
||||
|
||||
public sealed class ControllerConnection(RpcSendChannel<IMessageToController> sendChannel) {
|
||||
public ValueTask Send<TMessage>(TMessage message) where TMessage : IMessageToController {
|
||||
return sendChannel.SendMessage(message, CancellationToken.None /* TODO */);
|
||||
public ValueTask Send<TMessage>(TMessage message, CancellationToken cancellationToken) where TMessage : IMessageToController {
|
||||
return sendChannel.SendMessage(message, cancellationToken);
|
||||
}
|
||||
|
||||
// TODO handle properly
|
||||
public bool TrySend<TMessage>(TMessage message) where TMessage : IMessageToController {
|
||||
return sendChannel.TrySendMessage(message);
|
||||
}
|
||||
|
||||
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToController, ICanReply<TReply> {
|
||||
return sendChannel.SendMessage<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
@@ -1,82 +1,32 @@
|
||||
using Phantom.Agent.Services.Instances;
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Instance;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Agent.Services.Rpc;
|
||||
|
||||
public sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToAgent> {
|
||||
private static ILogger Logger { get; } = PhantomLogger.Create<ControllerMessageHandlerActor>();
|
||||
|
||||
public readonly record struct Init(RpcSendChannel<IMessageToController> SendChannel, AgentServices Agent, CancellationTokenSource ShutdownTokenSource);
|
||||
public readonly record struct Init(AgentServices Agent);
|
||||
|
||||
public static Props<IMessageToAgent> Factory(Init init) {
|
||||
return Props<IMessageToAgent>.Create(() => new ControllerMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
||||
}
|
||||
|
||||
private readonly RpcSendChannel<IMessageToController> sendChannel;
|
||||
private readonly AgentServices agent;
|
||||
private readonly CancellationTokenSource shutdownTokenSource;
|
||||
|
||||
private ControllerMessageHandlerActor(Init init) {
|
||||
this.sendChannel = init.SendChannel;
|
||||
this.agent = init.Agent;
|
||||
this.shutdownTokenSource = init.ShutdownTokenSource;
|
||||
|
||||
ReceiveAsync<RegisterAgentSuccessMessage>(HandleRegisterAgentSuccess);
|
||||
Receive<RegisterAgentFailureMessage>(HandleRegisterAgentFailure);
|
||||
ReceiveAndReplyLater<ConfigureInstanceMessage, Result<ConfigureInstanceResult, InstanceActionFailure>>(HandleConfigureInstance);
|
||||
ReceiveAndReplyLater<LaunchInstanceMessage, Result<LaunchInstanceResult, InstanceActionFailure>>(HandleLaunchInstance);
|
||||
ReceiveAndReplyLater<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(HandleStopInstance);
|
||||
ReceiveAndReplyLater<SendCommandToInstanceMessage, Result<SendCommandToInstanceResult, InstanceActionFailure>>(HandleSendCommandToInstance);
|
||||
}
|
||||
|
||||
private async Task HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
|
||||
Logger.Information("Agent authentication successful.");
|
||||
|
||||
void ShutdownAfterConfigurationFailed(Guid instanceGuid, InstanceConfiguration configuration) {
|
||||
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", configuration.InstanceName, instanceGuid);
|
||||
shutdownTokenSource.Cancel();
|
||||
}
|
||||
|
||||
foreach (var configureInstanceMessage in message.InitialInstanceConfigurations) {
|
||||
var result = await HandleConfigureInstance(configureInstanceMessage, alwaysReportStatus: true);
|
||||
if (!result.Is(ConfigureInstanceResult.Success)) {
|
||||
ShutdownAfterConfigurationFailed(configureInstanceMessage.InstanceGuid, configureInstanceMessage.Configuration);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
await sendChannel.SendMessage(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All), CancellationToken.None);
|
||||
agent.InstanceTicketManager.RefreshAgentStatus();
|
||||
}
|
||||
|
||||
private void HandleRegisterAgentFailure(RegisterAgentFailureMessage message) {
|
||||
string errorMessage = message.FailureKind switch {
|
||||
RegisterAgentFailure.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
|
||||
RegisterAgentFailure.InvalidToken => "Invalid token.",
|
||||
_ => "Unknown error " + (byte) message.FailureKind + "."
|
||||
};
|
||||
|
||||
Logger.Fatal("Agent authentication failed: {Error}", errorMessage);
|
||||
|
||||
PhantomLogger.Dispose();
|
||||
Environment.Exit(1);
|
||||
}
|
||||
|
||||
private Task<Result<ConfigureInstanceResult, InstanceActionFailure>> HandleConfigureInstance(ConfigureInstanceMessage message, bool alwaysReportStatus) {
|
||||
return agent.InstanceManager.Request(new InstanceManagerActor.ConfigureInstanceCommand(message.InstanceGuid, message.Configuration, message.LaunchProperties, message.LaunchNow, alwaysReportStatus));
|
||||
}
|
||||
|
||||
private async Task<Result<ConfigureInstanceResult, InstanceActionFailure>> HandleConfigureInstance(ConfigureInstanceMessage message) {
|
||||
return await HandleConfigureInstance(message, alwaysReportStatus: false);
|
||||
return await agent.InstanceManager.Request(new InstanceManagerActor.ConfigureInstanceCommand(message.InstanceGuid, message.Configuration, message.LaunchProperties, message.LaunchNow, AlwaysReportStatus: false));
|
||||
}
|
||||
|
||||
private async Task<Result<LaunchInstanceResult, InstanceActionFailure>> HandleLaunchInstance(LaunchInstanceMessage message) {
|
||||
|
@@ -1,7 +0,0 @@
|
||||
using Phantom.Common.Data;
|
||||
|
||||
namespace Phantom.Agent.Services.Rpc;
|
||||
|
||||
sealed class RpcClientAgentHandshake(AuthToken authToken) {
|
||||
|
||||
}
|
@@ -46,41 +46,45 @@ try {
|
||||
return 1;
|
||||
}
|
||||
|
||||
var (certificateThumbprint, authToken) = agentKey.Value;
|
||||
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
|
||||
|
||||
var rpcClientConnectionParameters = new RpcClientConnectionParameters(
|
||||
Host: controllerHost,
|
||||
Port: controllerPort,
|
||||
DistinguishedName: "phantom-controller",
|
||||
CertificateThumbprint: certificateThumbprint,
|
||||
CertificateThumbprint: agentKey.Value.CertificateThumbprint,
|
||||
AuthToken: agentKey.Value.AuthToken,
|
||||
SendQueueCapacity: 500,
|
||||
PingInterval: TimeSpan.FromSeconds(10)
|
||||
);
|
||||
|
||||
using var rpcClient = await RpcClient<IMessageToController, IMessageToAgent>.Connect("Controller", rpcClientConnectionParameters, null, AgentMessageRegistries.Definitions, shutdownCancellationToken);
|
||||
using var rpcClient = await RpcClient<IMessageToController, IMessageToAgent>.Connect("Controller", rpcClientConnectionParameters, AgentMessageRegistries.Definitions, shutdownCancellationToken);
|
||||
if (rpcClient == null) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
var controllerConnection = new ControllerConnection(rpcClient.SendChannel);
|
||||
|
||||
Task? rpcClientListener = null;
|
||||
try {
|
||||
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
|
||||
|
||||
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcClient.SendChannel));
|
||||
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
|
||||
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), controllerConnection);
|
||||
await agentServices.Initialize();
|
||||
|
||||
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(rpcClient.SendChannel, agentServices, shutdownCancellationTokenSource);
|
||||
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(agentServices);
|
||||
var rpcMessageHandlerActor = agentServices.ActorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
|
||||
|
||||
PhantomLogger.Root.Information("Phantom Panel agent is ready.");
|
||||
rpcClientListener = rpcClient.Listen(rpcMessageHandlerActor);
|
||||
|
||||
if (await agentServices.Register(controllerConnection, shutdownCancellationToken)) {
|
||||
PhantomLogger.Root.Information("Phantom Panel agent is ready.");
|
||||
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
|
||||
}
|
||||
|
||||
await agentServices.Shutdown();
|
||||
} finally {
|
||||
try {
|
||||
await rpcClient.SendChannel.SendMessage(new UnregisterAgentMessage(), CancellationToken.None);
|
||||
await controllerConnection.Send(new UnregisterAgentMessage(), CancellationToken.None);
|
||||
// TODO wait for acknowledgment
|
||||
} catch (Exception e) {
|
||||
PhantomLogger.Root.Warning(e, "Could not unregister agent after shutdown.");
|
||||
|
@@ -1,4 +1,5 @@
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
using Phantom.Utils.Rpc;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
|
||||
namespace Phantom.Common.Data;
|
||||
|
||||
|
5
Common/Phantom.Common.Data/Replies/RegisterAgentError.cs
Normal file
5
Common/Phantom.Common.Data/Replies/RegisterAgentError.cs
Normal file
@@ -0,0 +1,5 @@
|
||||
namespace Phantom.Common.Data.Replies;
|
||||
|
||||
public enum RegisterAgentError : byte {
|
||||
ConnectionAlreadyHasAnAgent = 0,
|
||||
}
|
@@ -1,6 +0,0 @@
|
||||
namespace Phantom.Common.Data.Replies;
|
||||
|
||||
public enum RegisterAgentFailure : byte {
|
||||
ConnectionAlreadyHasAnAgent,
|
||||
InvalidToken
|
||||
}
|
@@ -1,5 +1,6 @@
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using MemoryPack;
|
||||
using Phantom.Utils.Monads;
|
||||
using Phantom.Utils.Result;
|
||||
|
||||
namespace Phantom.Common.Data;
|
||||
@@ -24,6 +25,9 @@ public sealed partial class Result<TValue, TError> {
|
||||
[MemoryPackIgnore]
|
||||
public TError Error => !hasValue ? error! : throw new InvalidOperationException("Attempted to get error from a success result.");
|
||||
|
||||
[MemoryPackIgnore]
|
||||
public Either<TValue, TError> AsEither => hasValue ? Either.Left(value!) : Either.Right(error!);
|
||||
|
||||
private Result(bool hasValue, TValue? value, TError? error) {
|
||||
this.hasValue = hasValue;
|
||||
this.value = value;
|
||||
|
@@ -14,8 +14,6 @@ public static class AgentMessageRegistries {
|
||||
public static IMessageDefinitions<IMessageToController, IMessageToAgent> Definitions { get; } = new MessageDefinitions();
|
||||
|
||||
static AgentMessageRegistries() {
|
||||
ToAgent.Add<RegisterAgentSuccessMessage>(0);
|
||||
ToAgent.Add<RegisterAgentFailureMessage>(1);
|
||||
ToAgent.Add<ConfigureInstanceMessage, Result<ConfigureInstanceResult, InstanceActionFailure>>(2);
|
||||
ToAgent.Add<LaunchInstanceMessage, Result<LaunchInstanceResult, InstanceActionFailure>>(3);
|
||||
ToAgent.Add<StopInstanceMessage, Result<StopInstanceResult, InstanceActionFailure>>(4);
|
||||
|
@@ -5,5 +5,5 @@ namespace Phantom.Common.Messages.Agent.ToAgent;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record RegisterAgentFailureMessage(
|
||||
[property: MemoryPackOrder(0)] RegisterAgentFailure FailureKind
|
||||
[property: MemoryPackOrder(0)] RegisterAgentError ErrorKind
|
||||
) : IMessageToAgent;
|
||||
|
@@ -1,11 +1,14 @@
|
||||
using MemoryPack;
|
||||
using System.Collections.Immutable;
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Agent;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Utils.Actor;
|
||||
|
||||
namespace Phantom.Common.Messages.Agent.ToController;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record RegisterAgentMessage(
|
||||
[property: MemoryPackOrder(0)] AuthToken AuthToken,
|
||||
[property: MemoryPackOrder(1)] AgentInfo AgentInfo
|
||||
) : IMessageToController;
|
||||
[property: MemoryPackOrder(0)] AgentInfo AgentInfo
|
||||
) : IMessageToController, ICanReply<Result<ImmutableArray<ConfigureInstanceMessage>, RegisterAgentError>>;
|
||||
|
@@ -1,9 +0,0 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data;
|
||||
|
||||
namespace Phantom.Common.Messages.Web.ToController;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record RegisterWebMessage(
|
||||
[property: MemoryPackOrder(0)] AuthToken AuthToken
|
||||
) : IMessageToController;
|
@@ -21,7 +21,6 @@ public static class WebMessageRegistries {
|
||||
public static IMessageDefinitions<IMessageToController, IMessageToWeb> Definitions { get; } = new MessageDefinitions();
|
||||
|
||||
static WebMessageRegistries() {
|
||||
ToController.Add<RegisterWebMessage>(0);
|
||||
ToController.Add<UnregisterWebMessage>(1);
|
||||
ToController.Add<LogInMessage, Optional<LogInSuccess>>(2);
|
||||
ToController.Add<LogOutMessage>(3);
|
||||
@@ -42,7 +41,6 @@ public static class WebMessageRegistries {
|
||||
ToController.Add<GetAuditLogMessage, Result<ImmutableArray<AuditLogItem>, UserActionFailure>>(18);
|
||||
ToController.Add<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(19);
|
||||
|
||||
ToWeb.Add<RegisterWebResultMessage>(0);
|
||||
ToWeb.Add<RefreshAgentsMessage>(1);
|
||||
ToWeb.Add<RefreshInstancesMessage>(2);
|
||||
ToWeb.Add<InstanceOutputMessage>(3);
|
||||
|
@@ -13,6 +13,7 @@ using Phantom.Controller.Minecraft;
|
||||
using Phantom.Controller.Services.Users.Sessions;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Agents;
|
||||
@@ -64,7 +65,7 @@ sealed class AgentManager {
|
||||
|
||||
public async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, RpcConnectionToClient<IMessageToAgent> connection) {
|
||||
if (!this.authToken.FixedTimeEquals(authToken)) {
|
||||
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.InvalidToken));
|
||||
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentError.InvalidToken));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@@ -1,5 +1,4 @@
|
||||
using Akka.Actor;
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.ToController;
|
||||
using Phantom.Common.Messages.Web;
|
||||
@@ -13,7 +12,7 @@ using Phantom.Controller.Services.Rpc;
|
||||
using Phantom.Controller.Services.Users;
|
||||
using Phantom.Controller.Services.Users.Sessions;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Runtime2;
|
||||
using Phantom.Utils.Rpc;
|
||||
using IMessageFromAgentToController = Phantom.Common.Messages.Agent.IMessageToController;
|
||||
using IMessageFromWebToController = Phantom.Common.Messages.Web.IMessageToController;
|
||||
|
||||
|
@@ -40,12 +40,11 @@ sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
|
||||
Receive<ReportInstancePlayerCountsMessage>(HandleReportInstancePlayerCounts);
|
||||
Receive<ReportInstanceEventMessage>(HandleReportInstanceEvent);
|
||||
Receive<InstanceOutputMessage>(HandleInstanceOutput);
|
||||
Receive<ReplyMessage>(HandleReply);
|
||||
}
|
||||
|
||||
private async Task HandleRegisterAgent(RegisterAgentMessage message) {
|
||||
if (agentGuid != message.AgentInfo.AgentGuid) {
|
||||
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
|
||||
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentError.ConnectionAlreadyHasAnAgent));
|
||||
}
|
||||
else {
|
||||
await agentRegistrationHandler.TryRegisterImpl(connection, message);
|
||||
@@ -84,8 +83,4 @@ sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
|
||||
private void HandleInstanceOutput(InstanceOutputMessage message) {
|
||||
instanceLogManager.ReceiveLines(message.InstanceGuid, message.Lines);
|
||||
}
|
||||
|
||||
private void HandleReply(ReplyMessage message) {
|
||||
connection.Receive(message);
|
||||
}
|
||||
}
|
||||
|
@@ -1,34 +0,0 @@
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Agent;
|
||||
using Phantom.Common.Messages.Agent.Handshake;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Rpc;
|
||||
|
||||
public sealed class RpcServerAgentHandshake(AuthToken authToken) : RpcServerHandshake {
|
||||
private static readonly ILogger Logger = PhantomLogger.Create<RpcServerAgentHandshake>();
|
||||
|
||||
protected override async Task<bool> AcceptClient(string remoteAddress, Stream stream, CancellationToken cancellationToken) {
|
||||
Memory<byte> buffer = new Memory<byte>(new byte[AuthToken.Length]);
|
||||
await stream.ReadExactlyAsync(buffer, cancellationToken: cancellationToken);
|
||||
|
||||
if (!authToken.FixedTimeEquals(buffer.Span)) {
|
||||
Logger.Warning("Rejected client {}, invalid authorization token.", remoteAddress);
|
||||
await Respond(remoteAddress, stream, new InvalidAuthToken(), cancellationToken);
|
||||
return false;
|
||||
}
|
||||
|
||||
AgentInfo agentInfo = await Serialization.Deserialize<AgentInfo>(stream, cancellationToken);
|
||||
return true;
|
||||
}
|
||||
|
||||
private async ValueTask Respond(string remoteAddress, Stream stream, IAgentHandshakeResult result, CancellationToken cancellationToken) {
|
||||
try {
|
||||
await Serialization.Serialize(result, stream, cancellationToken);
|
||||
} catch (Exception e) {
|
||||
Logger.Error(e, "Could not send handshake result to client {}.", remoteAddress);
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,5 +1,4 @@
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Messages.Web;
|
||||
using Phantom.Common.Messages.Web;
|
||||
using Phantom.Common.Messages.Web.ToController;
|
||||
using Phantom.Common.Messages.Web.ToWeb;
|
||||
using Phantom.Controller.Minecraft;
|
||||
@@ -10,7 +9,7 @@ using Phantom.Controller.Services.Users;
|
||||
using Phantom.Controller.Services.Users.Sessions;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime2;
|
||||
using Phantom.Utils.Rpc;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Rpc;
|
||||
|
@@ -1,5 +1,5 @@
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
|
||||
namespace Phantom.Controller;
|
||||
|
||||
|
@@ -3,7 +3,7 @@ using Phantom.Utils.Cryptography;
|
||||
using Phantom.Utils.IO;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Monads;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Rpc;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
using Serilog;
|
||||
|
||||
|
@@ -2,7 +2,6 @@
|
||||
using Phantom.Controller;
|
||||
using Phantom.Controller.Database.Postgres;
|
||||
using Phantom.Controller.Services;
|
||||
using Phantom.Controller.Services.Rpc;
|
||||
using Phantom.Utils.IO;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
@@ -58,8 +57,8 @@ try {
|
||||
await controllerServices.Initialize();
|
||||
|
||||
LinkedTasks<bool> rpcServerTasks = new LinkedTasks<bool>([
|
||||
new RpcServer("Agent", agentRpcServerHost, agentKeyData.Certificate, new RpcServerAgentHandshake(agentKeyData.AuthToken)).Run(shutdownCancellationToken),
|
||||
// new RpcServer("Web", webRpcServerHost, webKeyData.Certificate).Run(shutdownCancellationToken),
|
||||
new RpcServer("Agent", agentRpcServerHost, agentKeyData.AuthToken, agentKeyData.Certificate).Run(shutdownCancellationToken),
|
||||
new RpcServer("Web", webRpcServerHost, agentKeyData.AuthToken, webKeyData.Certificate).Run(shutdownCancellationToken),
|
||||
]);
|
||||
|
||||
// If either RPC server crashes, stop the whole process.
|
||||
|
@@ -1,18 +1,12 @@
|
||||
using System.Collections.Immutable;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.Security.Cryptography;
|
||||
using MemoryPack;
|
||||
|
||||
namespace Phantom.Common.Data;
|
||||
namespace Phantom.Utils.Rpc;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
[SuppressMessage("ReSharper", "MemberCanBePrivate.Global")]
|
||||
public sealed partial class AuthToken {
|
||||
public sealed class AuthToken {
|
||||
public const int Length = 12;
|
||||
|
||||
[MemoryPackOrder(0)]
|
||||
[MemoryPackInclude]
|
||||
public readonly ImmutableArray<byte> Bytes;
|
||||
public ImmutableArray<byte> Bytes { get; }
|
||||
|
||||
public AuthToken(ImmutableArray<byte> bytes) {
|
||||
if (bytes.Length != Length) {
|
||||
@@ -30,10 +24,6 @@ public sealed partial class AuthToken {
|
||||
return CryptographicOperations.FixedTimeEquals(Bytes.AsSpan(), other);
|
||||
}
|
||||
|
||||
internal void WriteTo(Span<byte> span) {
|
||||
Bytes.CopyTo(span);
|
||||
}
|
||||
|
||||
public static AuthToken Generate() {
|
||||
return new AuthToken([..RandomNumberGenerator.GetBytes(Length)]);
|
||||
}
|
@@ -29,7 +29,7 @@ sealed record MessageFrame(uint MessageId, ushort RegistryCode, ReadOnlyMemory<b
|
||||
|
||||
private static void CheckMessageLength(int messageLength) {
|
||||
if (messageLength < 0) {
|
||||
throw new RpcErrorException("Message length is negative", RpcError.InvalidData);
|
||||
throw new RpcErrorException("Message length is negative.", RpcError.InvalidData);
|
||||
}
|
||||
|
||||
if (messageLength > MaxMessageBytes) {
|
||||
|
@@ -27,7 +27,7 @@ sealed record ReplyFrame(uint ReplyingToMessageId, ReadOnlyMemory<byte> Serializ
|
||||
|
||||
private static void CheckReplyLength(int replyLength) {
|
||||
if (replyLength < 0) {
|
||||
throw new RpcErrorException("Reply length is negative", RpcError.InvalidData);
|
||||
throw new RpcErrorException("Reply length is negative.", RpcError.InvalidData);
|
||||
}
|
||||
|
||||
if (replyLength > MaxReplyBytes) {
|
||||
|
@@ -12,7 +12,7 @@ public sealed class MessageRegistry<TMessageBase>(ILogger logger) {
|
||||
|
||||
public void Add<TMessage>(ushort code) where TMessage : TMessageBase {
|
||||
if (HasReplyType(typeof(TMessage))) {
|
||||
throw new ArgumentException("This overload is for messages without a reply");
|
||||
throw new ArgumentException("This overload is for messages without a reply.");
|
||||
}
|
||||
|
||||
AddTypeCodeMapping<TMessage>(code);
|
||||
|
@@ -8,8 +8,8 @@ using Serilog;
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> : IDisposable {
|
||||
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, RpcClientConnectionParameters connectionParameters, RpcClientHandshake handshake, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) {
|
||||
RpcClientConnector connector = new RpcClientConnector(loggerName, connectionParameters, handshake);
|
||||
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, RpcClientConnectionParameters connectionParameters, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) {
|
||||
RpcClientConnector connector = new RpcClientConnector(loggerName, connectionParameters);
|
||||
RpcClientConnector.Connection? connection = await connector.EstablishNewConnection(cancellationToken);
|
||||
return connection == null ? null : new RpcClient<TClientToServerMessage, TServerToClientMessage>(loggerName, connectionParameters, messageDefinitions, connector, connection);
|
||||
}
|
||||
@@ -85,8 +85,6 @@ public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> :
|
||||
logger.Error(e, "Caught exception while closing connection.");
|
||||
}
|
||||
|
||||
// TODO disconnection handshake?
|
||||
|
||||
logger.Information("Client shut down.");
|
||||
}
|
||||
|
||||
|
@@ -7,6 +7,7 @@ public readonly record struct RpcClientConnectionParameters(
|
||||
ushort Port,
|
||||
string DistinguishedName,
|
||||
RpcCertificateThumbprint CertificateThumbprint,
|
||||
AuthToken AuthToken,
|
||||
ushort SendQueueCapacity,
|
||||
TimeSpan PingInterval
|
||||
);
|
||||
|
@@ -15,16 +15,16 @@ internal sealed class RpcClientConnector {
|
||||
private static readonly TimeSpan DisconnectTimeout = TimeSpan.FromSeconds(10);
|
||||
|
||||
private readonly ILogger logger;
|
||||
private readonly Guid sessionId;
|
||||
private readonly RpcClientConnectionParameters parameters;
|
||||
private readonly RpcClientHandshake handshake;
|
||||
private readonly SslClientAuthenticationOptions sslOptions;
|
||||
|
||||
private bool loggedCertificateValidationError = false;
|
||||
|
||||
public RpcClientConnector(string loggerName, RpcClientConnectionParameters parameters, RpcClientHandshake handshake) {
|
||||
public RpcClientConnector(string loggerName, RpcClientConnectionParameters parameters) {
|
||||
this.logger = PhantomLogger.Create<RpcClientConnector>(loggerName);
|
||||
this.sessionId = Guid.NewGuid();
|
||||
this.parameters = parameters;
|
||||
this.handshake = handshake;
|
||||
|
||||
this.sslOptions = new SslClientAuthenticationOptions {
|
||||
AllowRenegotiation = false,
|
||||
@@ -91,7 +91,7 @@ internal sealed class RpcClientConnector {
|
||||
logger.Information("Established a secure connection.");
|
||||
|
||||
try {
|
||||
await handshake.AcceptServer(stream, cancellationToken);
|
||||
await PerformApplicationHandshake(stream, cancellationToken);
|
||||
} catch (EndOfStreamException) {
|
||||
logger.Warning("Could not perform application handshake, connection lost.");
|
||||
handledException = true;
|
||||
@@ -142,6 +142,12 @@ internal sealed class RpcClientConnector {
|
||||
return false;
|
||||
}
|
||||
|
||||
private async Task PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
|
||||
await Serialization.WriteAuthToken(parameters.AuthToken, stream, cancellationToken);
|
||||
await Serialization.WriteGuid(sessionId, stream, cancellationToken);
|
||||
// TODO read response
|
||||
}
|
||||
|
||||
private static async Task DisconnectSocket(Socket socket, Stream? stream) {
|
||||
if (stream != null) {
|
||||
await stream.DisposeAsync();
|
||||
|
@@ -1,5 +0,0 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public abstract class RpcClientHandshake {
|
||||
protected internal abstract Task<bool> AcceptServer(Stream stream, CancellationToken cancellationToken);
|
||||
}
|
@@ -1,6 +1,6 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public enum RpcHandshakeResult : byte {
|
||||
UnknownError = 0,
|
||||
InvalidFormat = 1,
|
||||
Success = 0,
|
||||
InvalidAuthToken = 1,
|
||||
}
|
||||
|
@@ -3,12 +3,13 @@ using System.Net.Security;
|
||||
using System.Net.Sockets;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Monads;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCertificate certificate, RpcServerHandshake handshake) {
|
||||
public sealed class RpcServer(string loggerName, EndPoint endPoint, AuthToken authToken, RpcServerCertificate certificate) {
|
||||
private readonly ILogger logger = PhantomLogger.Create<RpcServer>(loggerName);
|
||||
private readonly List<Client> clients = [];
|
||||
|
||||
@@ -39,7 +40,7 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
|
||||
|
||||
while (!shutdownToken.IsCancellationRequested) {
|
||||
Socket clientSocket = await serverSocket.AcceptAsync(shutdownToken);
|
||||
clients.Add(new Client(logger, clientSocket, sslOptions, handshake, shutdownToken));
|
||||
clients.Add(new Client(logger, clientSocket, sslOptions, authToken, shutdownToken));
|
||||
clients.RemoveAll(static client => client.Task.IsCompleted);
|
||||
}
|
||||
} finally {
|
||||
@@ -77,17 +78,19 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
|
||||
|
||||
public Task Task { get; }
|
||||
|
||||
private string Address => socket.RemoteEndPoint?.ToString() ?? "<unknown address>";
|
||||
|
||||
private readonly ILogger logger;
|
||||
private readonly Socket socket;
|
||||
private readonly SslServerAuthenticationOptions sslOptions;
|
||||
private readonly RpcServerHandshake handshake;
|
||||
private readonly AuthToken authToken;
|
||||
private readonly CancellationToken shutdownToken;
|
||||
|
||||
public Client(ILogger logger, Socket socket, SslServerAuthenticationOptions sslOptions, RpcServerHandshake handshake, CancellationToken shutdownToken) {
|
||||
public Client(ILogger logger, Socket socket, SslServerAuthenticationOptions sslOptions, AuthToken authToken, CancellationToken shutdownToken) {
|
||||
this.logger = logger;
|
||||
this.socket = socket;
|
||||
this.sslOptions = sslOptions;
|
||||
this.handshake = handshake;
|
||||
this.authToken = authToken;
|
||||
this.shutdownToken = shutdownToken;
|
||||
this.Task = Run();
|
||||
}
|
||||
@@ -98,13 +101,18 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
|
||||
|
||||
try {
|
||||
await stream.AuthenticateAsServerAsync(sslOptions, shutdownToken);
|
||||
} catch (OperationCanceledException) {
|
||||
throw;
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Could not establish a secure connection.");
|
||||
return;
|
||||
}
|
||||
|
||||
Either<Guid, RpcHandshakeResult> handshakeResult;
|
||||
try {
|
||||
await handshake.AcceptClient(socket.RemoteEndPoint?.ToString() ?? "<unknown address>", stream, shutdownToken);
|
||||
handshakeResult = await PerformApplicationHandshake(stream, shutdownToken);
|
||||
} catch (OperationCanceledException) {
|
||||
throw;
|
||||
} catch (EndOfStreamException) {
|
||||
logger.Warning("Could not perform application handshake, connection lost.");
|
||||
return;
|
||||
@@ -113,9 +121,17 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] buffer = new byte[1024];
|
||||
int readBytes;
|
||||
while ((readBytes = await stream.ReadAsync(buffer, shutdownToken)) > 0) {}
|
||||
switch (handshakeResult) {
|
||||
case Left<Guid, RpcHandshakeResult>(var sessionId):
|
||||
await Serialization.WriteByte((byte) RpcHandshakeResult.Success, stream, shutdownToken);
|
||||
break;
|
||||
|
||||
case Right<Guid, RpcHandshakeResult>(var error):
|
||||
await Serialization.WriteByte((byte) error, stream, shutdownToken);
|
||||
break;
|
||||
}
|
||||
} catch (OperationCanceledException) {
|
||||
logger.Warning("Cancelling an incoming client due to shutdown.");
|
||||
} finally {
|
||||
try {
|
||||
using var timeoutTokenSource = new CancellationTokenSource(DisconnectTimeout);
|
||||
@@ -129,5 +145,16 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, RpcServerCer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<Either<Guid, RpcHandshakeResult>> PerformApplicationHandshake(Stream stream, CancellationToken cancellationToken) {
|
||||
var clientAuthToken = await Serialization.ReadAuthToken(stream, cancellationToken);
|
||||
if (!authToken.FixedTimeEquals(clientAuthToken)) {
|
||||
logger.Warning("Rejected client {}, invalid authorization token.", Address);
|
||||
return Either.Right(RpcHandshakeResult.InvalidAuthToken);
|
||||
}
|
||||
|
||||
var sessionId = await Serialization.ReadGuid(stream, cancellationToken);
|
||||
return Either.Left(sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,5 +0,0 @@
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public abstract class RpcServerHandshake {
|
||||
protected internal abstract Task<bool> AcceptClient(string remoteAddress, Stream stream, CancellationToken cancellationToken);
|
||||
}
|
@@ -1,8 +1,7 @@
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using Phantom.Utils.Monads;
|
||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
namespace Phantom.Utils.Rpc.Runtime.Tls;
|
||||
|
||||
public sealed class RpcServerCertificate {
|
||||
public static byte[] CreateAndExport(string commonName) {
|
@@ -5,50 +5,73 @@ using MemoryPack;
|
||||
namespace Phantom.Utils.Rpc;
|
||||
|
||||
static class Serialization {
|
||||
private const int GuidBytes = 16;
|
||||
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
|
||||
|
||||
private static async ValueTask WritePrimitive<T>(T value, int size, Action<Span<byte>, T> writer, Stream stream, CancellationToken cancellationToken) {
|
||||
private static async ValueTask WriteValue<T>(T value, int size, Action<Span<byte>, T> writer, Stream stream, CancellationToken cancellationToken) {
|
||||
using var buffer = RentedMemory<byte>.Rent(size);
|
||||
writer(buffer.AsSpan, value);
|
||||
await stream.WriteAsync(buffer.AsMemory, cancellationToken);
|
||||
}
|
||||
|
||||
private static async ValueTask<T> ReadPrimitive<T>(Func<ReadOnlySpan<byte>, T> reader, int size, Stream stream, CancellationToken cancellationToken) {
|
||||
private static async ValueTask<T> ReadValue<T>(Func<ReadOnlySpan<byte>, T> reader, int size, Stream stream, CancellationToken cancellationToken) {
|
||||
using var buffer = RentedMemory<byte>.Rent(size);
|
||||
await stream.ReadExactlyAsync(buffer.AsMemory, cancellationToken);
|
||||
return reader(buffer.AsSpan);
|
||||
}
|
||||
|
||||
public static ValueTask WriteByte(byte value, Stream stream, CancellationToken cancellationToken) {
|
||||
return WritePrimitive(value, sizeof(byte), static (span, value) => span[0] = value, stream, cancellationToken);
|
||||
return WriteValue(value, sizeof(byte), static (span, value) => span[0] = value, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask<byte> ReadByte(Stream stream, CancellationToken cancellationToken) {
|
||||
return ReadPrimitive(static span => span[0], sizeof(byte), stream, cancellationToken);
|
||||
return ReadValue(static span => span[0], sizeof(byte), stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask WriteUnsignedShort(ushort value, Stream stream, CancellationToken cancellationToken) {
|
||||
return WritePrimitive(value, sizeof(ushort), BinaryPrimitives.WriteUInt16LittleEndian, stream, cancellationToken);
|
||||
return WriteValue(value, sizeof(ushort), BinaryPrimitives.WriteUInt16LittleEndian, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask<ushort> ReadUnsignedShort(Stream stream, CancellationToken cancellationToken) {
|
||||
return ReadPrimitive(BinaryPrimitives.ReadUInt16LittleEndian, sizeof(ushort), stream, cancellationToken);
|
||||
return ReadValue(BinaryPrimitives.ReadUInt16LittleEndian, sizeof(ushort), stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask WriteSignedInt(int value, Stream stream, CancellationToken cancellationToken) {
|
||||
return WritePrimitive(value, sizeof(int), BinaryPrimitives.WriteInt32LittleEndian, stream, cancellationToken);
|
||||
return WriteValue(value, sizeof(int), BinaryPrimitives.WriteInt32LittleEndian, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask<int> ReadSignedInt(Stream stream, CancellationToken cancellationToken) {
|
||||
return ReadPrimitive(BinaryPrimitives.ReadInt32LittleEndian, sizeof(int), stream, cancellationToken);
|
||||
return ReadValue(BinaryPrimitives.ReadInt32LittleEndian, sizeof(int), stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask WriteUnsignedInt(uint value, Stream stream, CancellationToken cancellationToken) {
|
||||
return WritePrimitive(value, sizeof(uint), BinaryPrimitives.WriteUInt32LittleEndian, stream, cancellationToken);
|
||||
return WriteValue(value, sizeof(uint), BinaryPrimitives.WriteUInt32LittleEndian, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask<uint> ReadUnsignedInt(Stream stream, CancellationToken cancellationToken) {
|
||||
return ReadPrimitive(BinaryPrimitives.ReadUInt32LittleEndian, sizeof(uint), stream, cancellationToken);
|
||||
return ReadValue(BinaryPrimitives.ReadUInt32LittleEndian, sizeof(uint), stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask WriteGuid(Guid guid, Stream stream, CancellationToken cancellationToken) {
|
||||
static void Write(Span<byte> span, Guid guid) {
|
||||
if (!guid.TryWriteBytes(span)) {
|
||||
throw new ArgumentException("Span is not large enough to write a GUID.", nameof(span));
|
||||
}
|
||||
}
|
||||
|
||||
return WriteValue(guid, size: GuidBytes, Write, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask<Guid> ReadGuid(Stream stream, CancellationToken cancellationToken) {
|
||||
return ReadValue(static span => new Guid(span), size: GuidBytes, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask WriteAuthToken(AuthToken authToken, Stream stream, CancellationToken cancellationToken) {
|
||||
return stream.WriteAsync(authToken.Bytes.AsMemory(), cancellationToken);
|
||||
}
|
||||
|
||||
public static ValueTask<AuthToken> ReadAuthToken(Stream stream, CancellationToken cancellationToken) {
|
||||
return ReadValue(static span => new AuthToken([..span]), AuthToken.Length, stream, cancellationToken);
|
||||
}
|
||||
|
||||
public static async ValueTask<ReadOnlyMemory<byte>> ReadBytes(int length, Stream stream, CancellationToken cancellationToken) {
|
||||
|
@@ -1,6 +1,9 @@
|
||||
namespace Phantom.Utils.Monads;
|
||||
|
||||
public abstract record Either<TLeft, TRight> {
|
||||
public abstract bool IsLeft { get; }
|
||||
public abstract bool IsRight { get; }
|
||||
|
||||
public abstract TLeft RequireLeft { get; }
|
||||
public abstract TRight RequireRight { get; }
|
||||
|
||||
|
@@ -1,6 +1,9 @@
|
||||
namespace Phantom.Utils.Monads;
|
||||
|
||||
public sealed record Left<TLeft, TRight>(TLeft Value) : Either<TLeft, TRight> {
|
||||
public override bool IsLeft => true;
|
||||
public override bool IsRight => false;
|
||||
|
||||
public override TLeft RequireLeft => Value;
|
||||
public override TRight RequireRight => throw new InvalidOperationException("Either<" + typeof(TLeft).Name + ", " + typeof(TRight).Name + "> has a left value, but right value was requested.");
|
||||
|
||||
|
@@ -1,6 +1,9 @@
|
||||
namespace Phantom.Utils.Monads;
|
||||
|
||||
public sealed record Right<TLeft, TRight>(TRight Value) : Either<TLeft, TRight> {
|
||||
public override bool IsLeft => false;
|
||||
public override bool IsRight => true;
|
||||
|
||||
public override TLeft RequireLeft => throw new InvalidOperationException("Either<" + typeof(TLeft).Name + ", " + typeof(TRight).Name + "> has a right value, but left value was requested.");
|
||||
public override TRight RequireRight => Value;
|
||||
|
||||
|
@@ -48,8 +48,6 @@ try {
|
||||
string dataProtectionKeysPath = Path.GetFullPath("./keys");
|
||||
CreateFolderOrStop(dataProtectionKeysPath, Chmod.URWX);
|
||||
|
||||
var (certificateThumbprint, authToken) = webKey.Value;
|
||||
|
||||
var administratorToken = TokenGenerator.Create(60);
|
||||
var applicationProperties = new ApplicationProperties(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken));
|
||||
|
||||
@@ -57,12 +55,13 @@ try {
|
||||
Host: controllerHost,
|
||||
Port: controllerPort,
|
||||
DistinguishedName: "phantom-controller",
|
||||
CertificateThumbprint: certificateThumbprint,
|
||||
CertificateThumbprint: webKey.Value.CertificateThumbprint,
|
||||
AuthToken: webKey.Value.AuthToken,
|
||||
SendQueueCapacity: 500,
|
||||
PingInterval: TimeSpan.FromSeconds(10)
|
||||
);
|
||||
|
||||
using var rpcClient = await RpcClient<IMessageToController, IMessageToWeb>.Connect("Controller", rpcClientConnectionParameters, null, WebMessageRegistries.Definitions, shutdownCancellationToken);
|
||||
using var rpcClient = await RpcClient<IMessageToController, IMessageToWeb>.Connect("Controller", rpcClientConnectionParameters, WebMessageRegistries.Definitions, shutdownCancellationToken);
|
||||
if (rpcClient == null) {
|
||||
return 1;
|
||||
}
|
||||
@@ -84,9 +83,9 @@ try {
|
||||
PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", webConfiguration.HttpUrl, webConfiguration.BasePath + "setup");
|
||||
|
||||
await WebLauncher.Launch(webConfiguration, webApplication);
|
||||
rpcClientListener = rpcClient.Listen(messageHandlerFactory.Create(actorSystem));
|
||||
|
||||
PhantomLogger.Root.Information("Phantom Panel web is ready.");
|
||||
rpcClientListener = rpcClient.Listen(messageHandlerFactory.Create(actorSystem));
|
||||
|
||||
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
|
||||
} finally {
|
||||
|
Reference in New Issue
Block a user