1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-11-25 16:42:54 +01:00

Compare commits

..

7 Commits

41 changed files with 328 additions and 277 deletions

View File

@ -1,9 +1,10 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="Controller + Agent x3" type="CompoundRunConfigurationType"> <configuration default="false" name="Controller + Web + Agent x3" type="CompoundRunConfigurationType">
<toRun name="Agent 1" type="DotNetProject" /> <toRun name="Agent 1" type="DotNetProject" />
<toRun name="Agent 2" type="DotNetProject" /> <toRun name="Agent 2" type="DotNetProject" />
<toRun name="Agent 3" type="DotNetProject" /> <toRun name="Agent 3" type="DotNetProject" />
<toRun name="Controller" type="DotNetProject" /> <toRun name="Controller" type="DotNetProject" />
<toRun name="Web" type="DotNetProject" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
</component> </component>

View File

@ -1,7 +1,8 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="Controller + Agent" type="CompoundRunConfigurationType"> <configuration default="false" name="Controller + Web + Agent" type="CompoundRunConfigurationType">
<toRun name="Agent 1" type="DotNetProject" /> <toRun name="Agent 1" type="DotNetProject" />
<toRun name="Controller" type="DotNetProject" /> <toRun name="Controller" type="DotNetProject" />
<toRun name="Web" type="DotNetProject" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
</component> </component>

View File

@ -6,6 +6,8 @@
<option name="PASS_PARENT_ENVS" value="1" /> <option name="PASS_PARENT_ENVS" value="1" />
<envs> <envs>
<env name="ASPNETCORE_ENVIRONMENT" value="Development" /> <env name="ASPNETCORE_ENVIRONMENT" value="Development" />
<env name="CONTROLLER_HOST" value="localhost" />
<env name="WEB_KEY" value="BMNHM9RRPMCBBY29D9XHS6KBKZSRY7F5XFN27YZX96XXWJC2NM2D6YRHM9PZN9JGQGCSJ6FMB2GGZ" />
<env name="WEB_SERVER_HOST" value="localhost" /> <env name="WEB_SERVER_HOST" value="localhost" />
</envs> </envs>
<option name="USE_EXTERNAL_CONSOLE" value="0" /> <option name="USE_EXTERNAL_CONSOLE" value="0" />

View File

@ -0,0 +1,25 @@
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent;
using Phantom.Utils.Rpc;
using Serilog;
namespace Phantom.Agent.Rpc;
public sealed class ControllerConnection {
private static readonly ILogger Logger = PhantomLogger.Create(nameof(ControllerConnection));
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
public ControllerConnection(RpcConnectionToServer<IMessageToControllerListener> connection) {
this.connection = connection;
Logger.Information("Connection ready.");
}
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return connection.Send(message);
}
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> where TReply : class {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
}
}

View File

@ -2,42 +2,37 @@
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Rpc;
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, RegisterAgentMessage> { public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
public static Task Launch(RpcConfiguration config, AuthToken authToken, AgentInfo agentInfo, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) { public static Task Launch(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, AgentInfo agentInfo, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(config, agentInfo.Guid, listenerFactory, new RegisterAgentMessage(authToken, agentInfo), disconnectSemaphore, receiveCancellationToken).Launch(); return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch();
} }
private readonly RpcConfiguration config; private RpcClientRuntime(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
private readonly Guid agentGuid;
private RpcClientRuntime(RpcConfiguration config, Guid agentGuid, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToAgentListener> messageListenerFactory, RegisterAgentMessage registerAgentMessage, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, AgentMessageRegistries.Definitions, messageListenerFactory, registerAgentMessage, disconnectSemaphore, receiveCancellationToken) {
this.config = config;
this.agentGuid = agentGuid;
}
protected override void RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection, TaskManager taskManager) {
ServerMessaging.SetCurrentConnection(connection);
protected override void RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection, ILogger logger, TaskManager taskManager) {
var keepAliveLoop = new KeepAliveLoop(connection); var keepAliveLoop = new KeepAliveLoop(connection);
try { try {
base.RunWithConnection(socket, connection, taskManager); base.RunWithConnection(socket, connection, logger, taskManager);
} finally { } finally {
keepAliveLoop.Cancel(); keepAliveLoop.Cancel();
} }
} }
protected override async Task Disconnect(ClientSocket socket) { protected override async Task Disconnect(ClientSocket socket, ILogger logger) {
var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage(agentGuid)).ToArray(); var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage()).ToArray();
try { try {
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None); await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
} catch (TimeoutException) { } catch (TimeoutException) {
config.RuntimeLogger.Error("Timed out communicating agent shutdown with the controller."); logger.Error("Timed out communicating agent shutdown with the controller.");
} }
} }
} }

View File

@ -1,35 +0,0 @@
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent;
using Phantom.Utils.Rpc;
using Serilog;
namespace Phantom.Agent.Rpc;
public static class ServerMessaging {
private static readonly ILogger Logger = PhantomLogger.Create(nameof(ServerMessaging));
private static RpcConnectionToServer<IMessageToControllerListener>? CurrentConnection { get; set; }
private static RpcConnectionToServer<IMessageToControllerListener> CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready.");
private static readonly object SetCurrentConnectionLock = new ();
internal static void SetCurrentConnection(RpcConnectionToServer<IMessageToControllerListener> connection) {
lock (SetCurrentConnectionLock) {
if (CurrentConnection != null) {
throw new InvalidOperationException("Server connection can only be set once.");
}
CurrentConnection = connection;
}
Logger.Information("Server connection ready.");
}
public static Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return CurrentConnectionOrThrow.Send(message);
}
public static Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> where TReply : class {
return CurrentConnectionOrThrow.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
}
}

View File

@ -1,4 +1,5 @@
using Phantom.Agent.Minecraft.Java; using Phantom.Agent.Minecraft.Java;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups; using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Instances; using Phantom.Agent.Services.Instances;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
@ -18,12 +19,12 @@ public sealed class AgentServices {
internal JavaRuntimeRepository JavaRuntimeRepository { get; } internal JavaRuntimeRepository JavaRuntimeRepository { get; }
internal InstanceSessionManager InstanceSessionManager { get; } internal InstanceSessionManager InstanceSessionManager { get; }
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration) { public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration, ControllerConnection controllerConnection) {
this.AgentFolders = agentFolders; this.AgentFolders = agentFolders;
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>()); this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>());
this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks); this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks);
this.JavaRuntimeRepository = new JavaRuntimeRepository(); this.JavaRuntimeRepository = new JavaRuntimeRepository();
this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager, BackupManager); this.InstanceSessionManager = new InstanceSessionManager(controllerConnection, agentInfo, agentFolders, JavaRuntimeRepository, TaskManager, BackupManager);
} }
public async Task Initialize() { public async Task Initialize() {

View File

@ -1,5 +1,4 @@
using Phantom.Agent.Minecraft.Launcher; using Phantom.Agent.Minecraft.Launcher;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Instances.Procedures; using Phantom.Agent.Services.Instances.Procedures;
using Phantom.Agent.Services.Instances.States; using Phantom.Agent.Services.Instances.States;
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
@ -57,7 +56,7 @@ sealed class Instance : IAsyncDisposable {
public void ReportLastStatus() { public void ReportLastStatus() {
TryUpdateStatus("Report last status of instance " + shortName, async () => { TryUpdateStatus("Report last status of instance " + shortName, async () => {
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus)); await Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
}); });
} }
@ -65,14 +64,14 @@ sealed class Instance : IAsyncDisposable {
TryUpdateStatus("Report status of instance " + shortName + " as " + status.GetType().Name, async () => { TryUpdateStatus("Report status of instance " + shortName + " as " + status.GetType().Name, async () => {
if (status != currentStatus) { if (status != currentStatus) {
currentStatus = status; currentStatus = status;
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status)); await Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
} }
}); });
} }
private void ReportEvent(IInstanceEvent instanceEvent) { private void ReportEvent(IInstanceEvent instanceEvent) {
var message = new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent); var message = new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent);
Services.TaskManager.Run("Report event for instance " + shortName, async () => await ServerMessaging.Send(message)); Services.TaskManager.Run("Report event for instance " + shortName, async () => await Services.ControllerConnection.Send(message));
} }
internal void TransitionState(IInstanceState newState) { internal void TransitionState(IInstanceState newState) {

View File

@ -16,12 +16,14 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200); private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200);
private readonly ControllerConnection controllerConnection;
private readonly Guid instanceGuid; private readonly Guid instanceGuid;
private readonly Channel<string> outputChannel; private readonly Channel<string> outputChannel;
private int droppedLinesSinceLastSend; private int droppedLinesSinceLastSend;
public InstanceLogSender(TaskManager taskManager, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName), taskManager, "Instance log sender for " + loggerName) { public InstanceLogSender(ControllerConnection controllerConnection, TaskManager taskManager, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName), taskManager, "Instance log sender for " + loggerName) {
this.controllerConnection = controllerConnection;
this.instanceGuid = instanceGuid; this.instanceGuid = instanceGuid;
this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped); this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped);
Start(); Start();
@ -61,7 +63,7 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
private async Task SendOutputToServer(ImmutableArray<string> lines) { private async Task SendOutputToServer(ImmutableArray<string> lines) {
if (!lines.IsEmpty) { if (!lines.IsEmpty) {
await ServerMessaging.Send(new InstanceOutputMessage(instanceGuid, lines)); await controllerConnection.Send(new InstanceOutputMessage(instanceGuid, lines));
} }
} }

View File

@ -1,7 +1,8 @@
using Phantom.Agent.Minecraft.Launcher; using Phantom.Agent.Minecraft.Launcher;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups; using Phantom.Agent.Services.Backups;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
namespace Phantom.Agent.Services.Instances; namespace Phantom.Agent.Services.Instances;
sealed record InstanceServices(TaskManager TaskManager, PortManager PortManager, BackupManager BackupManager, LaunchServices LaunchServices); sealed record InstanceServices(ControllerConnection ControllerConnection, TaskManager TaskManager, PortManager PortManager, BackupManager BackupManager, LaunchServices LaunchServices);

View File

@ -24,6 +24,7 @@ namespace Phantom.Agent.Services.Instances;
sealed class InstanceSessionManager : IAsyncDisposable { sealed class InstanceSessionManager : IAsyncDisposable {
private static readonly ILogger Logger = PhantomLogger.Create<InstanceSessionManager>(); private static readonly ILogger Logger = PhantomLogger.Create<InstanceSessionManager>();
private readonly ControllerConnection controllerConnection;
private readonly AgentInfo agentInfo; private readonly AgentInfo agentInfo;
private readonly string basePath; private readonly string basePath;
@ -36,7 +37,8 @@ sealed class InstanceSessionManager : IAsyncDisposable {
private uint instanceLoggerSequenceId = 0; private uint instanceLoggerSequenceId = 0;
public InstanceSessionManager(AgentInfo agentInfo, AgentFolders agentFolders, JavaRuntimeRepository javaRuntimeRepository, TaskManager taskManager, BackupManager backupManager) { public InstanceSessionManager(ControllerConnection controllerConnection, AgentInfo agentInfo, AgentFolders agentFolders, JavaRuntimeRepository javaRuntimeRepository, TaskManager taskManager, BackupManager backupManager) {
this.controllerConnection = controllerConnection;
this.agentInfo = agentInfo; this.agentInfo = agentInfo;
this.basePath = agentFolders.InstancesFolderPath; this.basePath = agentFolders.InstancesFolderPath;
this.shutdownCancellationToken = shutdownCancellationTokenSource.Token; this.shutdownCancellationToken = shutdownCancellationTokenSource.Token;
@ -45,7 +47,7 @@ sealed class InstanceSessionManager : IAsyncDisposable {
var launchServices = new LaunchServices(minecraftServerExecutables, javaRuntimeRepository); var launchServices = new LaunchServices(minecraftServerExecutables, javaRuntimeRepository);
var portManager = new PortManager(agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts); var portManager = new PortManager(agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
this.instanceServices = new InstanceServices(taskManager, portManager, backupManager, launchServices); this.instanceServices = new InstanceServices(controllerConnection, taskManager, portManager, backupManager, launchServices);
} }
private async Task<InstanceActionResult<T>> AcquireSemaphoreAndRun<T>(Func<Task<InstanceActionResult<T>>> func) { private async Task<InstanceActionResult<T>> AcquireSemaphoreAndRun<T>(Func<Task<InstanceActionResult<T>>> func) {
@ -146,7 +148,7 @@ sealed class InstanceSessionManager : IAsyncDisposable {
var runningInstances = GetRunningInstancesInternal(); var runningInstances = GetRunningInstancesInternal();
var runningInstanceCount = runningInstances.Length; var runningInstanceCount = runningInstances.Length;
var runningInstanceMemory = runningInstances.Aggregate(RamAllocationUnits.Zero, static (total, instance) => total + instance.Configuration.MemoryAllocation); var runningInstanceMemory = runningInstances.Aggregate(RamAllocationUnits.Zero, static (total, instance) => total + instance.Configuration.MemoryAllocation);
await ServerMessaging.Send(new ReportAgentStatusMessage(runningInstanceCount, runningInstanceMemory)); await controllerConnection.Send(new ReportAgentStatusMessage(runningInstanceCount, runningInstanceMemory));
} finally { } finally {
semaphore.Release(); semaphore.Release();
} }

View File

@ -27,7 +27,7 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
this.context = context; this.context = context;
this.Process = process; this.Process = process;
this.logSender = new InstanceLogSender(context.Services.TaskManager, configuration.InstanceGuid, context.ShortName); this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.Services.TaskManager, configuration.InstanceGuid, context.ShortName);
this.backupScheduler = new BackupScheduler(context.Services.TaskManager, context.Services.BackupManager, process, context, configuration.ServerPort); this.backupScheduler = new BackupScheduler(context.Services.TaskManager, context.Services.BackupManager, process, context, configuration.ServerPort);
this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted; this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted;

View File

@ -1,5 +1,4 @@
using Phantom.Agent.Rpc; using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;

View File

@ -1,6 +1,5 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;

View File

@ -1,4 +1,5 @@
using System.Reflection; using System.Reflection;
using NetMQ;
using Phantom.Agent; using Phantom.Agent;
using Phantom.Agent.Rpc; using Phantom.Agent.Rpc;
using Phantom.Agent.Services; using Phantom.Agent.Services;
@ -6,7 +7,9 @@ using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
@ -46,19 +49,18 @@ try {
var (controllerCertificate, agentToken) = agentKey.Value; var (controllerCertificate, agentToken) = agentKey.Value;
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts); var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks));
MessageListener MessageListenerFactory(RpcConnectionToServer<IMessageToControllerListener> connection) {
return new MessageListener(connection, agentServices, shutdownCancellationTokenSource);
}
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent..."); PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcSocket.Connection));
await agentServices.Initialize(); await agentServices.Initialize();
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1); var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate); var rpcMessageListener = new MessageListener(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource);
var rpcTask = RpcClientRuntime.Launch(rpcConfiguration, agentToken, agentInfo, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken); var rpcTask = RpcClientRuntime.Launch(rpcSocket, agentInfo, rpcMessageListener, rpcDisconnectSemaphore, shutdownCancellationToken);
try { try {
await rpcTask.WaitAsync(shutdownCancellationToken); await rpcTask.WaitAsync(shutdownCancellationToken);
} finally { } finally {
@ -68,6 +70,8 @@ try {
rpcDisconnectSemaphore.Release(); rpcDisconnectSemaphore.Release();
await rpcTask; await rpcTask;
rpcDisconnectSemaphore.Dispose(); rpcDisconnectSemaphore.Dispose();
NetMQConfig.Cleanup();
} }
return 0; return 0;

View File

@ -11,7 +11,7 @@ public static class AgentMessageRegistries {
public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent))); public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent)));
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController))); public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener> Definitions { get; } = new MessageDefinitions(); public static IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> Definitions { get; } = new MessageDefinitions();
static AgentMessageRegistries() { static AgentMessageRegistries() {
ToAgent.Add<RegisterAgentSuccessMessage>(0); ToAgent.Add<RegisterAgentSuccessMessage>(0);
@ -33,7 +33,7 @@ public static class AgentMessageRegistries {
ToController.Add<ReplyMessage>(127); ToController.Add<ReplyMessage>(127);
} }
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener> { private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
public MessageRegistry<IMessageToAgentListener> ToClient => ToAgent; public MessageRegistry<IMessageToAgentListener> ToClient => ToAgent;
public MessageRegistry<IMessageToControllerListener> ToServer => ToController; public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
@ -41,11 +41,7 @@ public static class AgentMessageRegistries {
return messageType == typeof(RegisterAgentMessage); return messageType == typeof(RegisterAgentMessage);
} }
public IMessage<IMessageToAgentListener, NoReply> CreateReplyToServerMessage( uint sequenceId, byte[] serializedReply) { public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply);
}
public IMessage<IMessageToControllerListener, NoReply> CreateReplyToClientMessage(uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply); return new ReplyMessage(sequenceId, serializedReply);
} }
} }

View File

@ -4,9 +4,7 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController; namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record UnregisterAgentMessage( public sealed partial record UnregisterAgentMessage : IMessageToController {
[property: MemoryPackOrder(0)] Guid AgentGuid
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleUnregisterAgent(this); return listener.HandleUnregisterAgent(this);
} }

View File

@ -10,28 +10,25 @@ public static class WebMessageRegistries {
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController))); public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static MessageRegistry<IMessageToWebListener> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb))); public static MessageRegistry<IMessageToWebListener> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb)));
public static IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener> Definitions { get; } = new MessageDefinitions(); public static IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> Definitions { get; } = new MessageDefinitions();
static WebMessageRegistries() { static WebMessageRegistries() {
ToController.Add<RegisterWebMessage>(0);
ToController.Add<CreateOrUpdateAdministratorUser, CreateOrUpdateAdministratorUserResult>(1); ToController.Add<CreateOrUpdateAdministratorUser, CreateOrUpdateAdministratorUserResult>(1);
ToController.Add<ReplyMessage>(127); ToController.Add<ReplyMessage>(127);
ToWeb.Add<ReplyMessage>(127); ToWeb.Add<ReplyMessage>(127);
} }
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener> { private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> {
public MessageRegistry<IMessageToWebListener> ToClient => ToWeb; public MessageRegistry<IMessageToWebListener> ToClient => ToWeb;
public MessageRegistry<IMessageToControllerListener> ToServer => ToController; public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
public bool IsRegistrationMessage(Type messageType) { public bool IsRegistrationMessage(Type messageType) {
return false; return messageType == typeof(RegisterWebMessage);
} }
public IMessage<IMessageToWebListener, NoReply> CreateReplyToServerMessage( uint sequenceId, byte[] serializedReply) { public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply);
}
public IMessage<IMessageToControllerListener, NoReply> CreateReplyToClientMessage(uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply); return new ReplyMessage(sequenceId, serializedReply);
} }
} }

View File

@ -11,6 +11,13 @@ public sealed class RpcConnectionToClient<TListener> {
private readonly MessageRegistry<TListener> messageRegistry; private readonly MessageRegistry<TListener> messageRegistry;
private readonly MessageReplyTracker messageReplyTracker; private readonly MessageReplyTracker messageReplyTracker;
private volatile bool isAuthorized;
public bool IsAuthorized {
get => isAuthorized;
set => isAuthorized = value;
}
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed; internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
private bool isClosed; private bool isClosed;

View File

@ -1,6 +1,7 @@
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;
using Serilog.Events; using Serilog.Events;
@ -8,49 +9,28 @@ using Serilog.Events;
namespace Phantom.Controller.Rpc; namespace Phantom.Controller.Rpc;
public static class RpcRuntime { public static class RpcRuntime {
public static Task Launch<TClientListener, TServerListener>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) { public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcRuntime<TClientListener, TServerListener>.Launch(config, messageDefinitions, listenerFactory, cancellationToken); return RpcRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
} }
} }
internal sealed class RpcRuntime<TClientListener, TServerListener> : RpcRuntime<ServerSocket> { internal sealed class RpcRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) { internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
return new RpcRuntime<TClientListener, TServerListener>(config, messageDefinitions, listenerFactory, cancellationToken).Launch(); var socket = RpcServerSocket.Connect(config);
return new RpcRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
} }
private static ServerSocket CreateSocket(RpcConfiguration config) { private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
var socket = new ServerSocket();
var options = socket.Options;
options.CurveServer = true;
options.CurveCertificate = config.ServerCertificate;
return socket;
}
private readonly RpcConfiguration config;
private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory; private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
private RpcRuntime(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(config, CreateSocket(config)) { private RpcRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
this.config = config;
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory; this.listenerFactory = listenerFactory;
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
} }
protected override void Connect(ServerSocket socket) { protected override void Run(ServerSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager) {
var logger = config.RuntimeLogger;
var url = config.TcpUrl;
logger.Information("Starting ZeroMQ server on {Url}...", url);
socket.Bind(url);
logger.Information("ZeroMQ server initialized, listening for connections on port {Port}.", config.Port);
}
protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
var logger = config.RuntimeLogger;
var clients = new Dictionary<ulong, Client>(); var clients = new Dictionary<ulong, Client>();
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) { void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
@ -62,12 +42,14 @@ internal sealed class RpcRuntime<TClientListener, TServerListener> : RpcRuntime<
var (routingId, data) = socket.Receive(cancellationToken); var (routingId, data) = socket.Receive(cancellationToken);
if (data.Length == 0) { if (data.Length == 0) {
LogMessageType(logger, routingId, data); LogMessageType(logger, routingId, data, messageType: null);
continue; continue;
} }
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
if (!clients.TryGetValue(routingId, out var client)) { if (!clients.TryGetValue(routingId, out var client)) {
if (!CheckIsRegistrationMessage(data, logger, routingId)) { if (!CheckIsRegistrationMessage(messageType, logger, routingId)) {
continue; continue;
} }
@ -78,7 +60,11 @@ internal sealed class RpcRuntime<TClientListener, TServerListener> : RpcRuntime<
clients[routingId] = client; clients[routingId] = client;
} }
LogMessageType(logger, routingId, data); if (!client.Connection.IsAuthorized && !CheckIsRegistrationMessage(messageType, logger, routingId)) {
continue;
}
LogMessageType(logger, routingId, data, messageType);
messageDefinitions.ToServer.Handle(data, client); messageDefinitions.ToServer.Handle(data, client);
} }
@ -87,40 +73,40 @@ internal sealed class RpcRuntime<TClientListener, TServerListener> : RpcRuntime<
} }
} }
private void LogMessageType(ILogger logger, uint routingId, ReadOnlyMemory<byte> data) { private void LogMessageType(ILogger logger, uint routingId, ReadOnlyMemory<byte> data, Type? messageType) {
if (!logger.IsEnabled(LogEventLevel.Verbose)) { if (!logger.IsEnabled(LogEventLevel.Verbose)) {
return; return;
} }
if (data.Length > 0 && messageDefinitions.ToServer.TryGetType(data, out var type)) { if (data.Length > 0 && messageType != null) {
logger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", type.Name, data.Length, routingId); logger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", messageType.Name, data.Length, routingId);
} }
else { else {
logger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId); logger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId);
} }
} }
private bool CheckIsRegistrationMessage(ReadOnlyMemory<byte> data, ILogger logger, uint routingId) { private bool CheckIsRegistrationMessage(Type? messageType, ILogger logger, uint routingId) {
if (messageDefinitions.ToServer.TryGetType(data, out var type) && messageDefinitions.IsRegistrationMessage(type)) { if (messageType != null && messageDefinitions.IsRegistrationMessage(messageType)) {
return true; return true;
} }
logger.Warning("Received {MessageType} from {RoutingId} who is not registered.", type?.Name ?? "unknown message", routingId); logger.Warning("Received {MessageType} from {RoutingId} who is not registered.", messageType?.Name ?? "unknown message", routingId);
return false; return false;
} }
private sealed class Client : MessageHandler<TServerListener> { private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; } public RpcConnectionToClient<TClientListener> Connection { get; }
private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions; private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
public Client(RpcConnectionToClient<TClientListener> connection, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, TServerListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) { public Client(RpcConnectionToClient<TClientListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
this.Connection = connection; this.Connection = connection;
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
} }
protected override Task SendReply(uint sequenceId, byte[] serializedReply) { protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyToServerMessage(sequenceId, serializedReply)); return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
} }
} }
} }

View File

@ -39,8 +39,8 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent)); await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
} }
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) { else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) {
var guid = message.AgentInfo.Guid; connection.IsAuthorized = true;
agentGuidWaiter.SetResult(guid); agentGuidWaiter.SetResult(message.AgentInfo.Guid);
} }
return NoReply.Instance; return NoReply.Instance;
@ -51,8 +51,11 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
} }
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) { public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
if (agentManager.UnregisterAgent(message.AgentGuid, connection)) { if (agentGuidWaiter.Task.IsCompleted) {
instanceManager.SetInstanceStatesForAgent(message.AgentGuid, InstanceStatus.Offline); var agentGuid = agentGuidWaiter.Task.Result;
if (agentManager.UnregisterAgent(agentGuid, connection)) {
instanceManager.SetInstanceStatesForAgent(agentGuid, InstanceStatus.Offline);
}
} }
connection.Close(); connection.Close();

View File

@ -1,6 +1,5 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
namespace Phantom.Controller; namespace Phantom.Controller;

View File

@ -1,6 +1,5 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;

View File

@ -1,4 +1,5 @@
using System.Reflection; using System.Reflection;
using NetMQ;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
@ -61,10 +62,14 @@ try {
return new RpcConfiguration(PhantomLogger.Create("Rpc", serviceName), PhantomLogger.Create<TaskManager>("Rpc", serviceName), host, port, connectionKey.Certificate); return new RpcConfiguration(PhantomLogger.Create("Rpc", serviceName), PhantomLogger.Create<TaskManager>("Rpc", serviceName), host, port, connectionKey.Certificate);
} }
try {
await Task.WhenAll( await Task.WhenAll(
RpcRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken), RpcRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken) RpcRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
); );
} finally {
NetMQConfig.Cleanup();
}
return 0; return 0;
} catch (OperationCanceledException) { } catch (OperationCanceledException) {

View File

@ -149,8 +149,8 @@ The repository includes a [Rider](https://www.jetbrains.com/rider/) projects wit
- `Controller` starts the Controller. - `Controller` starts the Controller.
- `Web` starts the Web server. - `Web` starts the Web server.
- `Agent 1`, `Agent 2`, `Agent 3` start one of the Agents. - `Agent 1`, `Agent 2`, `Agent 3` start one of the Agents.
- `Controller + Agent` starts the Controller and Agent 1. - `Controller + Web + Agent` starts the Controller and Agent 1.
- `Controller + Agent x3` starts the Controller and Agent 1, 2, and 3. - `Controller + Web + Agent x3` starts the Controller and Agent 1, 2, and 3.
## Bootstrap ## Bootstrap

View File

@ -1,10 +1,9 @@
namespace Phantom.Utils.Rpc.Message; namespace Phantom.Utils.Rpc.Message;
public interface IMessageDefinitions<TClientListener, TServerListener> { public interface IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
MessageRegistry<TClientListener> ToClient { get; } MessageRegistry<TClientListener> ToClient { get; }
MessageRegistry<TServerListener> ToServer { get; } MessageRegistry<TServerListener> ToServer { get; }
bool IsRegistrationMessage(Type messageType); bool IsRegistrationMessage(Type messageType);
IMessage<TClientListener, NoReply> CreateReplyToServerMessage(uint sequenceId, byte[] serializedReply); TReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply);
IMessage<TServerListener, NoReply> CreateReplyToClientMessage(uint sequenceId, byte[] serializedReply);
} }

View File

@ -4,14 +4,13 @@ using Serilog;
namespace Phantom.Utils.Rpc.Message; namespace Phantom.Utils.Rpc.Message;
public abstract class MessageHandler<TListener> { public abstract class MessageHandler<TListener> {
protected TListener Listener { get; } private readonly TListener listener;
private readonly ILogger logger; private readonly ILogger logger;
private readonly TaskManager taskManager; private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
protected MessageHandler(TListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) { protected MessageHandler(TListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) {
this.Listener = listener; this.listener = listener;
this.logger = logger; this.logger = logger;
this.taskManager = taskManager; this.taskManager = taskManager;
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
@ -29,8 +28,7 @@ public abstract class MessageHandler<TListener> {
} }
private async Task Handle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> { private async Task Handle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
TReply reply = await message.Accept(Listener); TReply reply = await message.Accept(listener);
if (reply is not NoReply) { if (reply is not NoReply) {
await SendReply(sequenceId, MessageSerializer.Serialize(reply)); await SendReply(sequenceId, MessageSerializer.Serialize(reply));
} }

View File

@ -1,56 +1,34 @@
using NetMQ; using NetMQ.Sockets;
using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog;
using Serilog.Events; using Serilog.Events;
using ILogger = Serilog.ILogger;
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc;
public abstract class RpcClientRuntime<TClientListener, TServerListener, THelloMessage> : RpcRuntime<ClientSocket> where THelloMessage : IMessage<TServerListener, NoReply> { public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
private static ClientSocket CreateSocket(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, THelloMessage helloMessage) { private readonly RpcConnectionToServer<TServerListener> connection;
var socket = new ClientSocket(); private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
var options = socket.Options; private readonly TClientListener messageListener;
options.CurveServerCertificate = config.ServerCertificate;
options.CurveCertificate = new NetMQCertificate();
options.HelloMessage = messageDefinitions.ToServer.Write(helloMessage).ToArray();
return socket;
}
private readonly RpcConfiguration config;
private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions;
private readonly Func<RpcConnectionToServer<TServerListener>, TClientListener> messageListenerFactory;
private readonly SemaphoreSlim disconnectSemaphore; private readonly SemaphoreSlim disconnectSemaphore;
private readonly CancellationToken receiveCancellationToken; private readonly CancellationToken receiveCancellationToken;
protected RpcClientRuntime(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToServer<TServerListener>, TClientListener> messageListenerFactory, THelloMessage helloMessage, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, CreateSocket(config, messageDefinitions, helloMessage)) { protected RpcClientRuntime(RpcClientSocket<TClientListener, TServerListener, TReplyMessage> socket, TClientListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket) {
this.config = config; this.connection = socket.Connection;
this.messageDefinitions = messageDefinitions; this.messageDefinitions = socket.MessageDefinitions;
this.messageListenerFactory = messageListenerFactory; this.messageListener = messageListener;
this.disconnectSemaphore = disconnectSemaphore; this.disconnectSemaphore = disconnectSemaphore;
this.receiveCancellationToken = receiveCancellationToken; this.receiveCancellationToken = receiveCancellationToken;
} }
protected sealed override void Connect(ClientSocket socket) { protected sealed override void Run(ClientSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager) {
var logger = config.RuntimeLogger; RunWithConnection(socket, connection, logger, taskManager);
var url = config.TcpUrl;
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
socket.Connect(url);
logger.Information("ZeroMQ client ready.");
} }
protected sealed override void Run(ClientSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) { protected virtual void RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection, ILogger logger, TaskManager taskManager) {
var connection = new RpcConnectionToServer<TServerListener>(socket, messageDefinitions.ToServer, replyTracker); var handler = new Handler(connection, messageDefinitions, messageListener, logger, taskManager, receiveCancellationToken);
RunWithConnection(socket, connection, taskManager);
}
protected virtual void RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection, TaskManager taskManager) {
var logger = config.RuntimeLogger;
var handler = new Handler(connection, messageDefinitions, messageListenerFactory(connection), logger, taskManager, receiveCancellationToken);
try { try {
while (!receiveCancellationToken.IsCancellationRequested) { while (!receiveCancellationToken.IsCancellationRequested) {
@ -85,15 +63,15 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, THelloM
private sealed class Handler : MessageHandler<TClientListener> { private sealed class Handler : MessageHandler<TClientListener> {
private readonly RpcConnectionToServer<TServerListener> connection; private readonly RpcConnectionToServer<TServerListener> connection;
private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions; private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
public Handler(RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, TClientListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) { public Handler(RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TClientListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
this.connection = connection; this.connection = connection;
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
} }
protected override Task SendReply(uint sequenceId, byte[] serializedReply) { protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return connection.Send(messageDefinitions.CreateReplyToClientMessage(sequenceId, serializedReply)); return connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
} }
} }
} }

View File

@ -9,7 +9,7 @@ public sealed class RpcConnectionToServer<TListener> {
private readonly MessageRegistry<TListener> messageRegistry; private readonly MessageRegistry<TListener> messageRegistry;
private readonly MessageReplyTracker replyTracker; private readonly MessageReplyTracker replyTracker;
public RpcConnectionToServer(ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) { internal RpcConnectionToServer(ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) {
this.socket = socket; this.socket = socket;
this.messageRegistry = messageRegistry; this.messageRegistry = messageRegistry;
this.replyTracker = replyTracker; this.replyTracker = replyTracker;

View File

@ -1,39 +1,28 @@
using NetMQ; using NetMQ;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc;
static class RpcRuntime {
internal static void SetDefaultSocketOptions(ThreadSafeSocketOptions options) {
// TODO test behavior when either agent or server are offline for a very long time
options.DelayAttachOnConnect = true;
options.ReceiveHighWatermark = 10_000;
options.SendHighWatermark = 10_000;
}
}
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket { public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
private readonly TSocket socket; private readonly TSocket socket;
private readonly ILogger runtimeLogger; private readonly ILogger runtimeLogger;
private readonly MessageReplyTracker replyTracker; private readonly MessageReplyTracker replyTracker;
private readonly TaskManager taskManager; private readonly TaskManager taskManager;
protected RpcRuntime(RpcConfiguration configuration, TSocket socket) { protected RpcRuntime(RpcSocket<TSocket> socket) {
RpcRuntime.SetDefaultSocketOptions(socket.Options); this.socket = socket.Socket;
this.socket = socket; this.runtimeLogger = socket.Config.RuntimeLogger;
this.runtimeLogger = configuration.RuntimeLogger; this.replyTracker = socket.ReplyTracker;
this.replyTracker = new MessageReplyTracker(runtimeLogger); this.taskManager = new TaskManager(socket.Config.TaskManagerLogger);
this.taskManager = new TaskManager(configuration.TaskManagerLogger);
} }
protected async Task Launch() { protected async Task Launch() {
Connect(socket);
void RunTask() { void RunTask() {
try { try {
Run(socket, replyTracker, taskManager); Run(socket, runtimeLogger, replyTracker, taskManager);
} catch (Exception e) { } catch (Exception e) {
runtimeLogger.Error(e, "Caught exception in RPC thread."); runtimeLogger.Error(e, "Caught exception in RPC thread.");
} }
@ -42,21 +31,19 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
try { try {
await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
// ignore // Ignore.
} finally { } finally {
await taskManager.Stop(); await taskManager.Stop();
await Disconnect(socket); await Disconnect(socket, runtimeLogger);
socket.Dispose(); socket.Dispose();
NetMQConfig.Cleanup(); runtimeLogger.Information("ZeroMQ runtime stopped.");
runtimeLogger.Information("ZeroMQ client stopped.");
} }
} }
protected abstract void Connect(TSocket socket); protected abstract void Run(TSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager);
protected abstract void Run(TSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager);
protected virtual Task Disconnect(TSocket socket) { protected virtual Task Disconnect(TSocket socket, ILogger logger) {
return Task.CompletedTask; return Task.CompletedTask;
} }
} }

View File

@ -0,0 +1,40 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Sockets;
public static class RpcClientSocket {
public static RpcClientSocket<TClientListener, TServerListener, TReplyMessage> Connect<TClientListener, TServerListener, TReplyMessage, THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : IMessage<TServerListener, NoReply> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcClientSocket<TClientListener, TServerListener, TReplyMessage>.Connect(config, messageDefinitions, helloMessage);
}
}
public sealed class RpcClientSocket<TClientListener, TServerListener, TReplyMessage> : RpcSocket<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static RpcClientSocket<TClientListener, TServerListener, TReplyMessage> Connect<THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : IMessage<TServerListener, NoReply> {
var socket = new ClientSocket();
var options = socket.Options;
options.CurveServerCertificate = config.ServerCertificate;
options.CurveCertificate = new NetMQCertificate();
options.HelloMessage = messageDefinitions.ToServer.Write(helloMessage).ToArray();
RpcSocket.SetDefaultSocketOptions(options);
var url = config.TcpUrl;
var logger = config.RuntimeLogger;
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
socket.Connect(url);
logger.Information("ZeroMQ client ready.");
return new RpcClientSocket<TClientListener, TServerListener, TReplyMessage>(socket, config, messageDefinitions);
}
public RpcConnectionToServer<TServerListener> Connection { get; }
internal IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions { get; }
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions) : base(socket, config) {
MessageDefinitions = messageDefinitions;
Connection = new RpcConnectionToServer<TServerListener>(socket, messageDefinitions.ToServer, ReplyTracker);
}
}

View File

@ -0,0 +1,25 @@
using NetMQ.Sockets;
namespace Phantom.Utils.Rpc.Sockets;
public sealed class RpcServerSocket : RpcSocket<ServerSocket> {
public static RpcServerSocket Connect(RpcConfiguration config) {
var socket = new ServerSocket();
var options = socket.Options;
options.CurveServer = true;
options.CurveCertificate = config.ServerCertificate;
RpcSocket.SetDefaultSocketOptions(options);
var url = config.TcpUrl;
var logger = config.RuntimeLogger;
logger.Information("Starting ZeroMQ server on {Url}...", url);
socket.Bind(url);
logger.Information("ZeroMQ server initialized, listening for connections on port {Port}.", config.Port);
return new RpcServerSocket(socket, config);
}
private RpcServerSocket(ServerSocket socket, RpcConfiguration config) : base(socket, config) {}
}

View File

@ -0,0 +1,25 @@
using NetMQ;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Sockets;
static class RpcSocket {
internal static void SetDefaultSocketOptions(ThreadSafeSocketOptions options) {
// TODO test behavior when either agent or server are offline for a very long time
options.DelayAttachOnConnect = true;
options.ReceiveHighWatermark = 10_000;
options.SendHighWatermark = 10_000;
}
}
public abstract class RpcSocket<TSocket> where TSocket : ThreadSafeSocket {
internal TSocket Socket { get; }
internal RpcConfiguration Config { get; }
internal MessageReplyTracker ReplyTracker { get; }
protected RpcSocket(TSocket socket, RpcConfiguration config) {
Socket = socket;
Config = config;
ReplyTracker = new MessageReplyTracker(config.RuntimeLogger);
}
}

View File

@ -1,21 +1,23 @@
using Microsoft.AspNetCore.Authentication.Cookies; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Components.Authorization; using Microsoft.AspNetCore.Components.Authorization;
using Microsoft.AspNetCore.Components.Server; using Microsoft.AspNetCore.Components.Server;
using Phantom.Common.Data.Web.Users; using Phantom.Common.Data.Web.Users;
using Phantom.Web.Services.Authentication; using Phantom.Web.Services.Authentication;
using Phantom.Web.Services.Authorization; using Phantom.Web.Services.Authorization;
using Phantom.Web.Services.Rpc;
namespace Phantom.Web.Services; namespace Phantom.Web.Services;
public static class PhantomWebServices { public static class PhantomWebServices {
public static void AddPhantomServices(this IServiceCollection services, CancellationToken cancellationToken) { public static void AddPhantomServices(this IServiceCollection services, CancellationToken cancellationToken) {
services.AddAuthentication(CookieAuthenticationDefaults.AuthenticationScheme).AddCookie(ConfigureIdentityCookie); services.AddSingleton<MessageListener>();
services.AddAuthorization(ConfigureAuthorization); services.AddSingleton<ControllerCommunication>();
services.AddSingleton<PermissionManager>();
services.AddSingleton(PhantomLoginStore.Create(cancellationToken)); services.AddSingleton(PhantomLoginStore.Create(cancellationToken));
services.AddScoped<PhantomLoginManager>(); services.AddScoped<PhantomLoginManager>();
services.AddAuthorization(ConfigureAuthorization);
services.AddScoped<IAuthorizationHandler, PermissionBasedPolicyHandler>(); services.AddScoped<IAuthorizationHandler, PermissionBasedPolicyHandler>();
services.AddScoped<AuthenticationStateProvider, ServerAuthenticationStateProvider>(); services.AddScoped<AuthenticationStateProvider, ServerAuthenticationStateProvider>();
} }
@ -26,19 +28,6 @@ public static class PhantomWebServices {
application.UseWhen(PhantomIdentityMiddleware.AcceptsPath, static app => app.UseMiddleware<PhantomIdentityMiddleware>()); application.UseWhen(PhantomIdentityMiddleware.AcceptsPath, static app => app.UseMiddleware<PhantomIdentityMiddleware>());
} }
private static void ConfigureIdentityCookie(CookieAuthenticationOptions o) {
o.Cookie.Name = "Phantom.Identity";
o.Cookie.HttpOnly = true;
o.Cookie.SameSite = SameSiteMode.Lax;
o.ExpireTimeSpan = TimeSpan.FromDays(30);
o.SlidingExpiration = true;
o.LoginPath = PhantomIdentityMiddleware.LoginPath;
o.LogoutPath = PhantomIdentityMiddleware.LogoutPath;
o.AccessDeniedPath = PhantomIdentityMiddleware.LoginPath;
}
private static void ConfigureAuthorization(AuthorizationOptions o) { private static void ConfigureAuthorization(AuthorizationOptions o) {
foreach (var permission in Permission.All) { foreach (var permission in Permission.All) {
o.AddPolicy(permission.Id, policy => policy.Requirements.Add(new PermissionBasedPolicyRequirement(permission))); o.AddPolicy(permission.Id, policy => policy.Requirements.Add(new PermissionBasedPolicyRequirement(permission)));

View File

@ -1,7 +1,7 @@
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
namespace Phantom.Web.Services; namespace Phantom.Web.Services.Rpc;
public sealed class ControllerCommunication { public sealed class ControllerCommunication {
private readonly RpcConnectionToServer<IMessageToControllerListener> connection; private readonly RpcConnectionToServer<IMessageToControllerListener> connection;

View File

@ -0,0 +1,19 @@
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Web.Services.Rpc;
public sealed class MessageListener : IMessageToWebListener {
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
public MessageListener(RpcConnectionToServer<IMessageToControllerListener> connection) {
this.connection = connection;
}
public Task<NoReply> HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -1,22 +1,14 @@
using NetMQ; using Phantom.Common.Messages.Web;
using NetMQ.Sockets;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional; using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks;
using Serilog.Events;
using ILogger = Serilog.ILogger;
namespace Phantom.Web.Services.Rpc; namespace Phantom.Web.Services.Rpc;
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToWebListener, IMessageToControllerListener> { public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> {
public static Task Launch(RpcConfiguration config, AuthToken authToken, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToWebListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) { public static Task Launch(RpcClientSocket<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToWebListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(config, listenerFactory, new RegisterWebMessage(authToken), disconnectSemaphore, receiveCancellationToken).Launch(); return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch();
} }
private RpcClientRuntime(RpcConfiguration config, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToWebListener> messageListenerFactory, RegisterWebMessage registerWebMessage, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, WebMessageRegistries.Definitions, messageListenerFactory, registerWebMessage, disconnectSemaphore, receiveCancellationToken) {} private RpcClientRuntime(RpcClientSocket<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToWebListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
} }

View File

@ -2,6 +2,6 @@
namespace Phantom.Web; namespace Phantom.Web;
public sealed record Configuration(ILogger Logger, string Host, ushort Port, string BasePath, string DataProtectionKeyFolderPath, CancellationToken CancellationToken) { sealed record Configuration(ILogger Logger, string Host, ushort Port, string BasePath, string DataProtectionKeyFolderPath, CancellationToken CancellationToken) {
public string HttpUrl => "http://" + Host + ":" + Port; public string HttpUrl => "http://" + Host + ":" + Port;
} }

View File

@ -1,8 +1,12 @@
using System.Reflection; using System.Reflection;
using NetMQ;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Phantom.Web; using Phantom.Web;
@ -46,22 +50,27 @@ try {
PhantomLogger.Root.InformationHeading("Launching Phantom Panel web..."); PhantomLogger.Root.InformationHeading("Launching Phantom Panel web...");
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken));
var configuration = new Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
var administratorToken = TokenGenerator.Create(60);
var taskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Web")); var taskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Web"));
var serviceConfiguration = new ServiceConfiguration(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken), shutdownCancellationToken);
var webApplication = WebLauncher.CreateApplication(configuration, taskManager, serviceConfiguration, rpcSocket.Connection);
MessageListener messageListener;
await using (var scope = webApplication.Services.CreateAsyncScope()) {
messageListener = scope.ServiceProvider.GetRequiredService<MessageListener>();
}
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1); var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate); var rpcTask = RpcClientRuntime.Launch(rpcSocket, messageListener, rpcDisconnectSemaphore, shutdownCancellationToken);
var rpcTask = RpcClientRuntime.Launch(rpcConfiguration, webToken, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken);
try { try {
var configuration = new Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
var administratorToken = TokenGenerator.Create(60);
PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken); PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken);
PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", configuration.HttpUrl, configuration.BasePath + "setup"); PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", configuration.HttpUrl, configuration.BasePath + "setup");
await WebLauncher.Launch(configuration, webApplication);
var serviceConfiguration = new ServiceConfiguration(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken), shutdownCancellationToken);
var webApplication = Launcher.CreateApplication(configuration, serviceConfiguration, taskManager);
await Launcher.Launch(configuration, webApplication);
} finally { } finally {
shutdownCancellationTokenSource.Cancel(); shutdownCancellationTokenSource.Cancel();
await taskManager.Stop(); await taskManager.Stop();
@ -69,6 +78,8 @@ try {
rpcDisconnectSemaphore.Release(); rpcDisconnectSemaphore.Release();
await rpcTask; await rpcTask;
rpcDisconnectSemaphore.Dispose(); rpcDisconnectSemaphore.Dispose();
NetMQConfig.Cleanup();
} }
return 0; return 0;

View File

@ -1,6 +1,5 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;

View File

@ -1,4 +1,6 @@
using Microsoft.AspNetCore.DataProtection; using Microsoft.AspNetCore.DataProtection;
using Phantom.Common.Messages.Web;
using Phantom.Utils.Rpc;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Phantom.Web.Base; using Phantom.Web.Base;
using Phantom.Web.Services; using Phantom.Web.Services;
@ -6,9 +8,9 @@ using Serilog;
namespace Phantom.Web; namespace Phantom.Web;
static class Launcher { static class WebLauncher {
public static WebApplication CreateApplication(Configuration config, ServiceConfiguration serviceConfiguration, TaskManager taskManager) { public static WebApplication CreateApplication(Configuration config, TaskManager taskManager, ServiceConfiguration serviceConfiguration, RpcConnectionToServer<IMessageToControllerListener> controllerConnection) {
var assembly = typeof(Launcher).Assembly; var assembly = typeof(WebLauncher).Assembly;
var builder = WebApplication.CreateBuilder(new WebApplicationOptions { var builder = WebApplication.CreateBuilder(new WebApplicationOptions {
ApplicationName = assembly.GetName().Name, ApplicationName = assembly.GetName().Name,
ContentRootPath = Path.GetDirectoryName(assembly.Location) ContentRootPath = Path.GetDirectoryName(assembly.Location)
@ -23,8 +25,9 @@ static class Launcher {
builder.WebHost.UseStaticWebAssets(); builder.WebHost.UseStaticWebAssets();
} }
builder.Services.AddSingleton(serviceConfiguration);
builder.Services.AddSingleton(taskManager); builder.Services.AddSingleton(taskManager);
builder.Services.AddSingleton(serviceConfiguration);
builder.Services.AddSingleton(controllerConnection);
builder.Services.AddPhantomServices(config.CancellationToken); builder.Services.AddPhantomServices(config.CancellationToken);
builder.Services.AddSingleton<IHostLifetime>(new NullLifetime()); builder.Services.AddSingleton<IHostLifetime>(new NullLifetime());