1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-10-18 15:42:50 +02:00

Compare commits

..

4 Commits

41 changed files with 277 additions and 328 deletions

View File

@ -1,10 +1,9 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="Controller + Web + Agent x3" type="CompoundRunConfigurationType"> <configuration default="false" name="Controller + Agent x3" type="CompoundRunConfigurationType">
<toRun name="Agent 1" type="DotNetProject" /> <toRun name="Agent 1" type="DotNetProject" />
<toRun name="Agent 2" type="DotNetProject" /> <toRun name="Agent 2" type="DotNetProject" />
<toRun name="Agent 3" type="DotNetProject" /> <toRun name="Agent 3" type="DotNetProject" />
<toRun name="Controller" type="DotNetProject" /> <toRun name="Controller" type="DotNetProject" />
<toRun name="Web" type="DotNetProject" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
</component> </component>

View File

@ -1,8 +1,7 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="Controller + Web + Agent" type="CompoundRunConfigurationType"> <configuration default="false" name="Controller + Agent" type="CompoundRunConfigurationType">
<toRun name="Agent 1" type="DotNetProject" /> <toRun name="Agent 1" type="DotNetProject" />
<toRun name="Controller" type="DotNetProject" /> <toRun name="Controller" type="DotNetProject" />
<toRun name="Web" type="DotNetProject" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
</component> </component>

View File

@ -6,8 +6,6 @@
<option name="PASS_PARENT_ENVS" value="1" /> <option name="PASS_PARENT_ENVS" value="1" />
<envs> <envs>
<env name="ASPNETCORE_ENVIRONMENT" value="Development" /> <env name="ASPNETCORE_ENVIRONMENT" value="Development" />
<env name="CONTROLLER_HOST" value="localhost" />
<env name="WEB_KEY" value="BMNHM9RRPMCBBY29D9XHS6KBKZSRY7F5XFN27YZX96XXWJC2NM2D6YRHM9PZN9JGQGCSJ6FMB2GGZ" />
<env name="WEB_SERVER_HOST" value="localhost" /> <env name="WEB_SERVER_HOST" value="localhost" />
</envs> </envs>
<option name="USE_EXTERNAL_CONSOLE" value="0" /> <option name="USE_EXTERNAL_CONSOLE" value="0" />

View File

@ -1,25 +0,0 @@
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent;
using Phantom.Utils.Rpc;
using Serilog;
namespace Phantom.Agent.Rpc;
public sealed class ControllerConnection {
private static readonly ILogger Logger = PhantomLogger.Create(nameof(ControllerConnection));
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
public ControllerConnection(RpcConnectionToServer<IMessageToControllerListener> connection) {
this.connection = connection;
Logger.Information("Connection ready.");
}
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return connection.Send(message);
}
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> where TReply : class {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
}
}

View File

@ -2,37 +2,42 @@
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Rpc;
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> { public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, RegisterAgentMessage> {
public static Task Launch(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, AgentInfo agentInfo, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) { public static Task Launch(RpcConfiguration config, AuthToken authToken, AgentInfo agentInfo, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch(); return new RpcClientRuntime(config, agentInfo.Guid, listenerFactory, new RegisterAgentMessage(authToken, agentInfo), disconnectSemaphore, receiveCancellationToken).Launch();
} }
private RpcClientRuntime(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {} private readonly RpcConfiguration config;
private readonly Guid agentGuid;
protected override void RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection, ILogger logger, TaskManager taskManager) { private RpcClientRuntime(RpcConfiguration config, Guid agentGuid, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToAgentListener> messageListenerFactory, RegisterAgentMessage registerAgentMessage, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, AgentMessageRegistries.Definitions, messageListenerFactory, registerAgentMessage, disconnectSemaphore, receiveCancellationToken) {
this.config = config;
this.agentGuid = agentGuid;
}
protected override void RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection, TaskManager taskManager) {
ServerMessaging.SetCurrentConnection(connection);
var keepAliveLoop = new KeepAliveLoop(connection); var keepAliveLoop = new KeepAliveLoop(connection);
try { try {
base.RunWithConnection(socket, connection, logger, taskManager); base.RunWithConnection(socket, connection, taskManager);
} finally { } finally {
keepAliveLoop.Cancel(); keepAliveLoop.Cancel();
} }
} }
protected override async Task Disconnect(ClientSocket socket, ILogger logger) { protected override async Task Disconnect(ClientSocket socket) {
var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage()).ToArray(); var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage(agentGuid)).ToArray();
try { try {
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None); await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
} catch (TimeoutException) { } catch (TimeoutException) {
logger.Error("Timed out communicating agent shutdown with the controller."); config.RuntimeLogger.Error("Timed out communicating agent shutdown with the controller.");
} }
} }
} }

View File

@ -0,0 +1,35 @@
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent;
using Phantom.Utils.Rpc;
using Serilog;
namespace Phantom.Agent.Rpc;
public static class ServerMessaging {
private static readonly ILogger Logger = PhantomLogger.Create(nameof(ServerMessaging));
private static RpcConnectionToServer<IMessageToControllerListener>? CurrentConnection { get; set; }
private static RpcConnectionToServer<IMessageToControllerListener> CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready.");
private static readonly object SetCurrentConnectionLock = new ();
internal static void SetCurrentConnection(RpcConnectionToServer<IMessageToControllerListener> connection) {
lock (SetCurrentConnectionLock) {
if (CurrentConnection != null) {
throw new InvalidOperationException("Server connection can only be set once.");
}
CurrentConnection = connection;
}
Logger.Information("Server connection ready.");
}
public static Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return CurrentConnectionOrThrow.Send(message);
}
public static Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> where TReply : class {
return CurrentConnectionOrThrow.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
}
}

View File

@ -1,5 +1,4 @@
using Phantom.Agent.Minecraft.Java; using Phantom.Agent.Minecraft.Java;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups; using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Instances; using Phantom.Agent.Services.Instances;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
@ -19,12 +18,12 @@ public sealed class AgentServices {
internal JavaRuntimeRepository JavaRuntimeRepository { get; } internal JavaRuntimeRepository JavaRuntimeRepository { get; }
internal InstanceSessionManager InstanceSessionManager { get; } internal InstanceSessionManager InstanceSessionManager { get; }
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration, ControllerConnection controllerConnection) { public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration) {
this.AgentFolders = agentFolders; this.AgentFolders = agentFolders;
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>()); this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>());
this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks); this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks);
this.JavaRuntimeRepository = new JavaRuntimeRepository(); this.JavaRuntimeRepository = new JavaRuntimeRepository();
this.InstanceSessionManager = new InstanceSessionManager(controllerConnection, agentInfo, agentFolders, JavaRuntimeRepository, TaskManager, BackupManager); this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager, BackupManager);
} }
public async Task Initialize() { public async Task Initialize() {

View File

@ -1,4 +1,5 @@
using Phantom.Agent.Minecraft.Launcher; using Phantom.Agent.Minecraft.Launcher;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Instances.Procedures; using Phantom.Agent.Services.Instances.Procedures;
using Phantom.Agent.Services.Instances.States; using Phantom.Agent.Services.Instances.States;
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
@ -56,7 +57,7 @@ sealed class Instance : IAsyncDisposable {
public void ReportLastStatus() { public void ReportLastStatus() {
TryUpdateStatus("Report last status of instance " + shortName, async () => { TryUpdateStatus("Report last status of instance " + shortName, async () => {
await Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus)); await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
}); });
} }
@ -64,14 +65,14 @@ sealed class Instance : IAsyncDisposable {
TryUpdateStatus("Report status of instance " + shortName + " as " + status.GetType().Name, async () => { TryUpdateStatus("Report status of instance " + shortName + " as " + status.GetType().Name, async () => {
if (status != currentStatus) { if (status != currentStatus) {
currentStatus = status; currentStatus = status;
await Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status)); await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
} }
}); });
} }
private void ReportEvent(IInstanceEvent instanceEvent) { private void ReportEvent(IInstanceEvent instanceEvent) {
var message = new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent); var message = new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent);
Services.TaskManager.Run("Report event for instance " + shortName, async () => await Services.ControllerConnection.Send(message)); Services.TaskManager.Run("Report event for instance " + shortName, async () => await ServerMessaging.Send(message));
} }
internal void TransitionState(IInstanceState newState) { internal void TransitionState(IInstanceState newState) {

View File

@ -16,14 +16,12 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200); private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200);
private readonly ControllerConnection controllerConnection;
private readonly Guid instanceGuid; private readonly Guid instanceGuid;
private readonly Channel<string> outputChannel; private readonly Channel<string> outputChannel;
private int droppedLinesSinceLastSend; private int droppedLinesSinceLastSend;
public InstanceLogSender(ControllerConnection controllerConnection, TaskManager taskManager, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName), taskManager, "Instance log sender for " + loggerName) { public InstanceLogSender(TaskManager taskManager, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName), taskManager, "Instance log sender for " + loggerName) {
this.controllerConnection = controllerConnection;
this.instanceGuid = instanceGuid; this.instanceGuid = instanceGuid;
this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped); this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped);
Start(); Start();
@ -63,7 +61,7 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
private async Task SendOutputToServer(ImmutableArray<string> lines) { private async Task SendOutputToServer(ImmutableArray<string> lines) {
if (!lines.IsEmpty) { if (!lines.IsEmpty) {
await controllerConnection.Send(new InstanceOutputMessage(instanceGuid, lines)); await ServerMessaging.Send(new InstanceOutputMessage(instanceGuid, lines));
} }
} }

View File

@ -1,8 +1,7 @@
using Phantom.Agent.Minecraft.Launcher; using Phantom.Agent.Minecraft.Launcher;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups; using Phantom.Agent.Services.Backups;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
namespace Phantom.Agent.Services.Instances; namespace Phantom.Agent.Services.Instances;
sealed record InstanceServices(ControllerConnection ControllerConnection, TaskManager TaskManager, PortManager PortManager, BackupManager BackupManager, LaunchServices LaunchServices); sealed record InstanceServices(TaskManager TaskManager, PortManager PortManager, BackupManager BackupManager, LaunchServices LaunchServices);

View File

@ -24,7 +24,6 @@ namespace Phantom.Agent.Services.Instances;
sealed class InstanceSessionManager : IAsyncDisposable { sealed class InstanceSessionManager : IAsyncDisposable {
private static readonly ILogger Logger = PhantomLogger.Create<InstanceSessionManager>(); private static readonly ILogger Logger = PhantomLogger.Create<InstanceSessionManager>();
private readonly ControllerConnection controllerConnection;
private readonly AgentInfo agentInfo; private readonly AgentInfo agentInfo;
private readonly string basePath; private readonly string basePath;
@ -37,8 +36,7 @@ sealed class InstanceSessionManager : IAsyncDisposable {
private uint instanceLoggerSequenceId = 0; private uint instanceLoggerSequenceId = 0;
public InstanceSessionManager(ControllerConnection controllerConnection, AgentInfo agentInfo, AgentFolders agentFolders, JavaRuntimeRepository javaRuntimeRepository, TaskManager taskManager, BackupManager backupManager) { public InstanceSessionManager(AgentInfo agentInfo, AgentFolders agentFolders, JavaRuntimeRepository javaRuntimeRepository, TaskManager taskManager, BackupManager backupManager) {
this.controllerConnection = controllerConnection;
this.agentInfo = agentInfo; this.agentInfo = agentInfo;
this.basePath = agentFolders.InstancesFolderPath; this.basePath = agentFolders.InstancesFolderPath;
this.shutdownCancellationToken = shutdownCancellationTokenSource.Token; this.shutdownCancellationToken = shutdownCancellationTokenSource.Token;
@ -47,7 +45,7 @@ sealed class InstanceSessionManager : IAsyncDisposable {
var launchServices = new LaunchServices(minecraftServerExecutables, javaRuntimeRepository); var launchServices = new LaunchServices(minecraftServerExecutables, javaRuntimeRepository);
var portManager = new PortManager(agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts); var portManager = new PortManager(agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
this.instanceServices = new InstanceServices(controllerConnection, taskManager, portManager, backupManager, launchServices); this.instanceServices = new InstanceServices(taskManager, portManager, backupManager, launchServices);
} }
private async Task<InstanceActionResult<T>> AcquireSemaphoreAndRun<T>(Func<Task<InstanceActionResult<T>>> func) { private async Task<InstanceActionResult<T>> AcquireSemaphoreAndRun<T>(Func<Task<InstanceActionResult<T>>> func) {
@ -148,7 +146,7 @@ sealed class InstanceSessionManager : IAsyncDisposable {
var runningInstances = GetRunningInstancesInternal(); var runningInstances = GetRunningInstancesInternal();
var runningInstanceCount = runningInstances.Length; var runningInstanceCount = runningInstances.Length;
var runningInstanceMemory = runningInstances.Aggregate(RamAllocationUnits.Zero, static (total, instance) => total + instance.Configuration.MemoryAllocation); var runningInstanceMemory = runningInstances.Aggregate(RamAllocationUnits.Zero, static (total, instance) => total + instance.Configuration.MemoryAllocation);
await controllerConnection.Send(new ReportAgentStatusMessage(runningInstanceCount, runningInstanceMemory)); await ServerMessaging.Send(new ReportAgentStatusMessage(runningInstanceCount, runningInstanceMemory));
} finally { } finally {
semaphore.Release(); semaphore.Release();
} }

View File

@ -27,7 +27,7 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
this.context = context; this.context = context;
this.Process = process; this.Process = process;
this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.Services.TaskManager, configuration.InstanceGuid, context.ShortName); this.logSender = new InstanceLogSender(context.Services.TaskManager, configuration.InstanceGuid, context.ShortName);
this.backupScheduler = new BackupScheduler(context.Services.TaskManager, context.Services.BackupManager, process, context, configuration.ServerPort); this.backupScheduler = new BackupScheduler(context.Services.TaskManager, context.Services.BackupManager, process, context, configuration.ServerPort);
this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted; this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted;

View File

@ -1,4 +1,5 @@
using Phantom.Common.Data.Instance; using Phantom.Agent.Rpc;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;

View File

@ -1,5 +1,6 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;

View File

@ -1,5 +1,4 @@
using System.Reflection; using System.Reflection;
using NetMQ;
using Phantom.Agent; using Phantom.Agent;
using Phantom.Agent.Rpc; using Phantom.Agent.Rpc;
using Phantom.Agent.Services; using Phantom.Agent.Services;
@ -7,9 +6,7 @@ using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
@ -49,18 +46,19 @@ try {
var (controllerCertificate, agentToken) = agentKey.Value; var (controllerCertificate, agentToken) = agentKey.Value;
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts); var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks));
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
MessageListener MessageListenerFactory(RpcConnectionToServer<IMessageToControllerListener> connection) {
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate); return new MessageListener(connection, agentServices, shutdownCancellationTokenSource);
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo)); }
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcSocket.Connection));
await agentServices.Initialize(); await agentServices.Initialize();
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1); var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
var rpcMessageListener = new MessageListener(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource); var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
var rpcTask = RpcClientRuntime.Launch(rpcSocket, agentInfo, rpcMessageListener, rpcDisconnectSemaphore, shutdownCancellationToken); var rpcTask = RpcClientRuntime.Launch(rpcConfiguration, agentToken, agentInfo, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken);
try { try {
await rpcTask.WaitAsync(shutdownCancellationToken); await rpcTask.WaitAsync(shutdownCancellationToken);
} finally { } finally {
@ -70,8 +68,6 @@ try {
rpcDisconnectSemaphore.Release(); rpcDisconnectSemaphore.Release();
await rpcTask; await rpcTask;
rpcDisconnectSemaphore.Dispose(); rpcDisconnectSemaphore.Dispose();
NetMQConfig.Cleanup();
} }
return 0; return 0;

View File

@ -11,7 +11,7 @@ public static class AgentMessageRegistries {
public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent))); public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent)));
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController))); public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> Definitions { get; } = new MessageDefinitions(); public static IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener> Definitions { get; } = new MessageDefinitions();
static AgentMessageRegistries() { static AgentMessageRegistries() {
ToAgent.Add<RegisterAgentSuccessMessage>(0); ToAgent.Add<RegisterAgentSuccessMessage>(0);
@ -33,7 +33,7 @@ public static class AgentMessageRegistries {
ToController.Add<ReplyMessage>(127); ToController.Add<ReplyMessage>(127);
} }
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> { private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener> {
public MessageRegistry<IMessageToAgentListener> ToClient => ToAgent; public MessageRegistry<IMessageToAgentListener> ToClient => ToAgent;
public MessageRegistry<IMessageToControllerListener> ToServer => ToController; public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
@ -41,7 +41,11 @@ public static class AgentMessageRegistries {
return messageType == typeof(RegisterAgentMessage); return messageType == typeof(RegisterAgentMessage);
} }
public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) { public IMessage<IMessageToAgentListener, NoReply> CreateReplyToServerMessage( uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply);
}
public IMessage<IMessageToControllerListener, NoReply> CreateReplyToClientMessage(uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply); return new ReplyMessage(sequenceId, serializedReply);
} }
} }

View File

@ -4,7 +4,9 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController; namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record UnregisterAgentMessage : IMessageToController { public sealed partial record UnregisterAgentMessage(
[property: MemoryPackOrder(0)] Guid AgentGuid
) : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleUnregisterAgent(this); return listener.HandleUnregisterAgent(this);
} }

View File

@ -10,25 +10,28 @@ public static class WebMessageRegistries {
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController))); public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static MessageRegistry<IMessageToWebListener> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb))); public static MessageRegistry<IMessageToWebListener> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb)));
public static IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> Definitions { get; } = new MessageDefinitions(); public static IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener> Definitions { get; } = new MessageDefinitions();
static WebMessageRegistries() { static WebMessageRegistries() {
ToController.Add<RegisterWebMessage>(0);
ToController.Add<CreateOrUpdateAdministratorUser, CreateOrUpdateAdministratorUserResult>(1); ToController.Add<CreateOrUpdateAdministratorUser, CreateOrUpdateAdministratorUserResult>(1);
ToController.Add<ReplyMessage>(127); ToController.Add<ReplyMessage>(127);
ToWeb.Add<ReplyMessage>(127); ToWeb.Add<ReplyMessage>(127);
} }
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> { private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener> {
public MessageRegistry<IMessageToWebListener> ToClient => ToWeb; public MessageRegistry<IMessageToWebListener> ToClient => ToWeb;
public MessageRegistry<IMessageToControllerListener> ToServer => ToController; public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
public bool IsRegistrationMessage(Type messageType) { public bool IsRegistrationMessage(Type messageType) {
return messageType == typeof(RegisterWebMessage); return false;
} }
public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) { public IMessage<IMessageToWebListener, NoReply> CreateReplyToServerMessage( uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply);
}
public IMessage<IMessageToControllerListener, NoReply> CreateReplyToClientMessage(uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply); return new ReplyMessage(sequenceId, serializedReply);
} }
} }

View File

@ -11,13 +11,6 @@ public sealed class RpcConnectionToClient<TListener> {
private readonly MessageRegistry<TListener> messageRegistry; private readonly MessageRegistry<TListener> messageRegistry;
private readonly MessageReplyTracker messageReplyTracker; private readonly MessageReplyTracker messageReplyTracker;
private volatile bool isAuthorized;
public bool IsAuthorized {
get => isAuthorized;
set => isAuthorized = value;
}
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed; internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
private bool isClosed; private bool isClosed;

View File

@ -1,7 +1,6 @@
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;
using Serilog.Events; using Serilog.Events;
@ -9,28 +8,49 @@ using Serilog.Events;
namespace Phantom.Controller.Rpc; namespace Phantom.Controller.Rpc;
public static class RpcRuntime { public static class RpcRuntime {
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { public static Task Launch<TClientListener, TServerListener>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
return RpcRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken); return RpcRuntime<TClientListener, TServerListener>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
} }
} }
internal sealed class RpcRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { internal sealed class RpcRuntime<TClientListener, TServerListener> : RpcRuntime<ServerSocket> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) { internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config); return new RpcRuntime<TClientListener, TServerListener>(config, messageDefinitions, listenerFactory, cancellationToken).Launch();
return new RpcRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
} }
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; private static ServerSocket CreateSocket(RpcConfiguration config) {
var socket = new ServerSocket();
var options = socket.Options;
options.CurveServer = true;
options.CurveCertificate = config.ServerCertificate;
return socket;
}
private readonly RpcConfiguration config;
private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory; private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
private RpcRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) { private RpcRuntime(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(config, CreateSocket(config)) {
this.config = config;
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory; this.listenerFactory = listenerFactory;
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
} }
protected override void Run(ServerSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager) { protected override void Connect(ServerSocket socket) {
var logger = config.RuntimeLogger;
var url = config.TcpUrl;
logger.Information("Starting ZeroMQ server on {Url}...", url);
socket.Bind(url);
logger.Information("ZeroMQ server initialized, listening for connections on port {Port}.", config.Port);
}
protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
var logger = config.RuntimeLogger;
var clients = new Dictionary<ulong, Client>(); var clients = new Dictionary<ulong, Client>();
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) { void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
@ -42,14 +62,12 @@ internal sealed class RpcRuntime<TClientListener, TServerListener, TReplyMessage
var (routingId, data) = socket.Receive(cancellationToken); var (routingId, data) = socket.Receive(cancellationToken);
if (data.Length == 0) { if (data.Length == 0) {
LogMessageType(logger, routingId, data, messageType: null); LogMessageType(logger, routingId, data);
continue; continue;
} }
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
if (!clients.TryGetValue(routingId, out var client)) { if (!clients.TryGetValue(routingId, out var client)) {
if (!CheckIsRegistrationMessage(messageType, logger, routingId)) { if (!CheckIsRegistrationMessage(data, logger, routingId)) {
continue; continue;
} }
@ -60,11 +78,7 @@ internal sealed class RpcRuntime<TClientListener, TServerListener, TReplyMessage
clients[routingId] = client; clients[routingId] = client;
} }
if (!client.Connection.IsAuthorized && !CheckIsRegistrationMessage(messageType, logger, routingId)) { LogMessageType(logger, routingId, data);
continue;
}
LogMessageType(logger, routingId, data, messageType);
messageDefinitions.ToServer.Handle(data, client); messageDefinitions.ToServer.Handle(data, client);
} }
@ -73,40 +87,40 @@ internal sealed class RpcRuntime<TClientListener, TServerListener, TReplyMessage
} }
} }
private void LogMessageType(ILogger logger, uint routingId, ReadOnlyMemory<byte> data, Type? messageType) { private void LogMessageType(ILogger logger, uint routingId, ReadOnlyMemory<byte> data) {
if (!logger.IsEnabled(LogEventLevel.Verbose)) { if (!logger.IsEnabled(LogEventLevel.Verbose)) {
return; return;
} }
if (data.Length > 0 && messageType != null) { if (data.Length > 0 && messageDefinitions.ToServer.TryGetType(data, out var type)) {
logger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", messageType.Name, data.Length, routingId); logger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", type.Name, data.Length, routingId);
} }
else { else {
logger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId); logger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId);
} }
} }
private bool CheckIsRegistrationMessage(Type? messageType, ILogger logger, uint routingId) { private bool CheckIsRegistrationMessage(ReadOnlyMemory<byte> data, ILogger logger, uint routingId) {
if (messageType != null && messageDefinitions.IsRegistrationMessage(messageType)) { if (messageDefinitions.ToServer.TryGetType(data, out var type) && messageDefinitions.IsRegistrationMessage(type)) {
return true; return true;
} }
logger.Warning("Received {MessageType} from {RoutingId} who is not registered.", messageType?.Name ?? "unknown message", routingId); logger.Warning("Received {MessageType} from {RoutingId} who is not registered.", type?.Name ?? "unknown message", routingId);
return false; return false;
} }
private sealed class Client : MessageHandler<TServerListener> { private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; } public RpcConnectionToClient<TClientListener> Connection { get; }
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions;
public Client(RpcConnectionToClient<TClientListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) { public Client(RpcConnectionToClient<TClientListener> connection, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, TServerListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
this.Connection = connection; this.Connection = connection;
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
} }
protected override Task SendReply(uint sequenceId, byte[] serializedReply) { protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply)); return Connection.Send(messageDefinitions.CreateReplyToServerMessage(sequenceId, serializedReply));
} }
} }
} }

View File

@ -39,8 +39,8 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent)); await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
} }
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) { else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) {
connection.IsAuthorized = true; var guid = message.AgentInfo.Guid;
agentGuidWaiter.SetResult(message.AgentInfo.Guid); agentGuidWaiter.SetResult(guid);
} }
return NoReply.Instance; return NoReply.Instance;
@ -51,11 +51,8 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
} }
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) { public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
if (agentGuidWaiter.Task.IsCompleted) { if (agentManager.UnregisterAgent(message.AgentGuid, connection)) {
var agentGuid = agentGuidWaiter.Task.Result; instanceManager.SetInstanceStatesForAgent(message.AgentGuid, InstanceStatus.Offline);
if (agentManager.UnregisterAgent(agentGuid, connection)) {
instanceManager.SetInstanceStatesForAgent(agentGuid, InstanceStatus.Offline);
}
} }
connection.Close(); connection.Close();

View File

@ -1,5 +1,6 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
namespace Phantom.Controller; namespace Phantom.Controller;

View File

@ -1,5 +1,6 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;

View File

@ -1,5 +1,4 @@
using System.Reflection; using System.Reflection;
using NetMQ;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
@ -62,14 +61,10 @@ try {
return new RpcConfiguration(PhantomLogger.Create("Rpc", serviceName), PhantomLogger.Create<TaskManager>("Rpc", serviceName), host, port, connectionKey.Certificate); return new RpcConfiguration(PhantomLogger.Create("Rpc", serviceName), PhantomLogger.Create<TaskManager>("Rpc", serviceName), host, port, connectionKey.Certificate);
} }
try { await Task.WhenAll(
await Task.WhenAll( RpcRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken), RpcRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
RpcRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken) );
);
} finally {
NetMQConfig.Cleanup();
}
return 0; return 0;
} catch (OperationCanceledException) { } catch (OperationCanceledException) {

View File

@ -149,8 +149,8 @@ The repository includes a [Rider](https://www.jetbrains.com/rider/) projects wit
- `Controller` starts the Controller. - `Controller` starts the Controller.
- `Web` starts the Web server. - `Web` starts the Web server.
- `Agent 1`, `Agent 2`, `Agent 3` start one of the Agents. - `Agent 1`, `Agent 2`, `Agent 3` start one of the Agents.
- `Controller + Web + Agent` starts the Controller and Agent 1. - `Controller + Agent` starts the Controller and Agent 1.
- `Controller + Web + Agent x3` starts the Controller and Agent 1, 2, and 3. - `Controller + Agent x3` starts the Controller and Agent 1, 2, and 3.
## Bootstrap ## Bootstrap

View File

@ -1,9 +1,10 @@
namespace Phantom.Utils.Rpc.Message; namespace Phantom.Utils.Rpc.Message;
public interface IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { public interface IMessageDefinitions<TClientListener, TServerListener> {
MessageRegistry<TClientListener> ToClient { get; } MessageRegistry<TClientListener> ToClient { get; }
MessageRegistry<TServerListener> ToServer { get; } MessageRegistry<TServerListener> ToServer { get; }
bool IsRegistrationMessage(Type messageType); bool IsRegistrationMessage(Type messageType);
TReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply); IMessage<TClientListener, NoReply> CreateReplyToServerMessage(uint sequenceId, byte[] serializedReply);
IMessage<TServerListener, NoReply> CreateReplyToClientMessage(uint sequenceId, byte[] serializedReply);
} }

View File

@ -4,13 +4,14 @@ using Serilog;
namespace Phantom.Utils.Rpc.Message; namespace Phantom.Utils.Rpc.Message;
public abstract class MessageHandler<TListener> { public abstract class MessageHandler<TListener> {
private readonly TListener listener; protected TListener Listener { get; }
private readonly ILogger logger; private readonly ILogger logger;
private readonly TaskManager taskManager; private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
protected MessageHandler(TListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) { protected MessageHandler(TListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) {
this.listener = listener; this.Listener = listener;
this.logger = logger; this.logger = logger;
this.taskManager = taskManager; this.taskManager = taskManager;
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
@ -28,11 +29,12 @@ public abstract class MessageHandler<TListener> {
} }
private async Task Handle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> { private async Task Handle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
TReply reply = await message.Accept(listener); TReply reply = await message.Accept(Listener);
if (reply is not NoReply) { if (reply is not NoReply) {
await SendReply(sequenceId, MessageSerializer.Serialize(reply)); await SendReply(sequenceId, MessageSerializer.Serialize(reply));
} }
} }
protected abstract Task SendReply(uint sequenceId, byte[] serializedReply); protected abstract Task SendReply(uint sequenceId, byte[] serializedReply);
} }

View File

@ -1,34 +1,56 @@
using NetMQ.Sockets; using NetMQ;
using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog;
using Serilog.Events; using Serilog.Events;
using ILogger = Serilog.ILogger;
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc;
public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { public abstract class RpcClientRuntime<TClientListener, TServerListener, THelloMessage> : RpcRuntime<ClientSocket> where THelloMessage : IMessage<TServerListener, NoReply> {
private readonly RpcConnectionToServer<TServerListener> connection; private static ClientSocket CreateSocket(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, THelloMessage helloMessage) {
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; var socket = new ClientSocket();
private readonly TClientListener messageListener; var options = socket.Options;
options.CurveServerCertificate = config.ServerCertificate;
options.CurveCertificate = new NetMQCertificate();
options.HelloMessage = messageDefinitions.ToServer.Write(helloMessage).ToArray();
return socket;
}
private readonly RpcConfiguration config;
private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions;
private readonly Func<RpcConnectionToServer<TServerListener>, TClientListener> messageListenerFactory;
private readonly SemaphoreSlim disconnectSemaphore; private readonly SemaphoreSlim disconnectSemaphore;
private readonly CancellationToken receiveCancellationToken; private readonly CancellationToken receiveCancellationToken;
protected RpcClientRuntime(RpcClientSocket<TClientListener, TServerListener, TReplyMessage> socket, TClientListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket) { protected RpcClientRuntime(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToServer<TServerListener>, TClientListener> messageListenerFactory, THelloMessage helloMessage, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, CreateSocket(config, messageDefinitions, helloMessage)) {
this.connection = socket.Connection; this.config = config;
this.messageDefinitions = socket.MessageDefinitions; this.messageDefinitions = messageDefinitions;
this.messageListener = messageListener; this.messageListenerFactory = messageListenerFactory;
this.disconnectSemaphore = disconnectSemaphore; this.disconnectSemaphore = disconnectSemaphore;
this.receiveCancellationToken = receiveCancellationToken; this.receiveCancellationToken = receiveCancellationToken;
} }
protected sealed override void Run(ClientSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager) { protected sealed override void Connect(ClientSocket socket) {
RunWithConnection(socket, connection, logger, taskManager); var logger = config.RuntimeLogger;
var url = config.TcpUrl;
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
socket.Connect(url);
logger.Information("ZeroMQ client ready.");
} }
protected virtual void RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection, ILogger logger, TaskManager taskManager) { protected sealed override void Run(ClientSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
var handler = new Handler(connection, messageDefinitions, messageListener, logger, taskManager, receiveCancellationToken); var connection = new RpcConnectionToServer<TServerListener>(socket, messageDefinitions.ToServer, replyTracker);
RunWithConnection(socket, connection, taskManager);
}
protected virtual void RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection, TaskManager taskManager) {
var logger = config.RuntimeLogger;
var handler = new Handler(connection, messageDefinitions, messageListenerFactory(connection), logger, taskManager, receiveCancellationToken);
try { try {
while (!receiveCancellationToken.IsCancellationRequested) { while (!receiveCancellationToken.IsCancellationRequested) {
@ -63,15 +85,15 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
private sealed class Handler : MessageHandler<TClientListener> { private sealed class Handler : MessageHandler<TClientListener> {
private readonly RpcConnectionToServer<TServerListener> connection; private readonly RpcConnectionToServer<TServerListener> connection;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions;
public Handler(RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TClientListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) { public Handler(RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, TClientListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
this.connection = connection; this.connection = connection;
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
} }
protected override Task SendReply(uint sequenceId, byte[] serializedReply) { protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply)); return connection.Send(messageDefinitions.CreateReplyToClientMessage(sequenceId, serializedReply));
} }
} }
} }

View File

@ -9,7 +9,7 @@ public sealed class RpcConnectionToServer<TListener> {
private readonly MessageRegistry<TListener> messageRegistry; private readonly MessageRegistry<TListener> messageRegistry;
private readonly MessageReplyTracker replyTracker; private readonly MessageReplyTracker replyTracker;
internal RpcConnectionToServer(ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) { public RpcConnectionToServer(ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) {
this.socket = socket; this.socket = socket;
this.messageRegistry = messageRegistry; this.messageRegistry = messageRegistry;
this.replyTracker = replyTracker; this.replyTracker = replyTracker;

View File

@ -1,28 +1,39 @@
using NetMQ; using NetMQ;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc;
static class RpcRuntime {
internal static void SetDefaultSocketOptions(ThreadSafeSocketOptions options) {
// TODO test behavior when either agent or server are offline for a very long time
options.DelayAttachOnConnect = true;
options.ReceiveHighWatermark = 10_000;
options.SendHighWatermark = 10_000;
}
}
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket { public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
private readonly TSocket socket; private readonly TSocket socket;
private readonly ILogger runtimeLogger; private readonly ILogger runtimeLogger;
private readonly MessageReplyTracker replyTracker; private readonly MessageReplyTracker replyTracker;
private readonly TaskManager taskManager; private readonly TaskManager taskManager;
protected RpcRuntime(RpcSocket<TSocket> socket) { protected RpcRuntime(RpcConfiguration configuration, TSocket socket) {
this.socket = socket.Socket; RpcRuntime.SetDefaultSocketOptions(socket.Options);
this.runtimeLogger = socket.Config.RuntimeLogger; this.socket = socket;
this.replyTracker = socket.ReplyTracker; this.runtimeLogger = configuration.RuntimeLogger;
this.taskManager = new TaskManager(socket.Config.TaskManagerLogger); this.replyTracker = new MessageReplyTracker(runtimeLogger);
this.taskManager = new TaskManager(configuration.TaskManagerLogger);
} }
protected async Task Launch() { protected async Task Launch() {
Connect(socket);
void RunTask() { void RunTask() {
try { try {
Run(socket, runtimeLogger, replyTracker, taskManager); Run(socket, replyTracker, taskManager);
} catch (Exception e) { } catch (Exception e) {
runtimeLogger.Error(e, "Caught exception in RPC thread."); runtimeLogger.Error(e, "Caught exception in RPC thread.");
} }
@ -31,19 +42,21 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
try { try {
await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
// Ignore. // ignore
} finally { } finally {
await taskManager.Stop(); await taskManager.Stop();
await Disconnect(socket, runtimeLogger); await Disconnect(socket);
socket.Dispose(); socket.Dispose();
runtimeLogger.Information("ZeroMQ runtime stopped."); NetMQConfig.Cleanup();
runtimeLogger.Information("ZeroMQ client stopped.");
} }
} }
protected abstract void Run(TSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager); protected abstract void Connect(TSocket socket);
protected abstract void Run(TSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager);
protected virtual Task Disconnect(TSocket socket, ILogger logger) { protected virtual Task Disconnect(TSocket socket) {
return Task.CompletedTask; return Task.CompletedTask;
} }
} }

View File

@ -1,40 +0,0 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Sockets;
public static class RpcClientSocket {
public static RpcClientSocket<TClientListener, TServerListener, TReplyMessage> Connect<TClientListener, TServerListener, TReplyMessage, THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : IMessage<TServerListener, NoReply> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcClientSocket<TClientListener, TServerListener, TReplyMessage>.Connect(config, messageDefinitions, helloMessage);
}
}
public sealed class RpcClientSocket<TClientListener, TServerListener, TReplyMessage> : RpcSocket<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static RpcClientSocket<TClientListener, TServerListener, TReplyMessage> Connect<THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : IMessage<TServerListener, NoReply> {
var socket = new ClientSocket();
var options = socket.Options;
options.CurveServerCertificate = config.ServerCertificate;
options.CurveCertificate = new NetMQCertificate();
options.HelloMessage = messageDefinitions.ToServer.Write(helloMessage).ToArray();
RpcSocket.SetDefaultSocketOptions(options);
var url = config.TcpUrl;
var logger = config.RuntimeLogger;
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
socket.Connect(url);
logger.Information("ZeroMQ client ready.");
return new RpcClientSocket<TClientListener, TServerListener, TReplyMessage>(socket, config, messageDefinitions);
}
public RpcConnectionToServer<TServerListener> Connection { get; }
internal IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions { get; }
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions) : base(socket, config) {
MessageDefinitions = messageDefinitions;
Connection = new RpcConnectionToServer<TServerListener>(socket, messageDefinitions.ToServer, ReplyTracker);
}
}

View File

@ -1,25 +0,0 @@
using NetMQ.Sockets;
namespace Phantom.Utils.Rpc.Sockets;
public sealed class RpcServerSocket : RpcSocket<ServerSocket> {
public static RpcServerSocket Connect(RpcConfiguration config) {
var socket = new ServerSocket();
var options = socket.Options;
options.CurveServer = true;
options.CurveCertificate = config.ServerCertificate;
RpcSocket.SetDefaultSocketOptions(options);
var url = config.TcpUrl;
var logger = config.RuntimeLogger;
logger.Information("Starting ZeroMQ server on {Url}...", url);
socket.Bind(url);
logger.Information("ZeroMQ server initialized, listening for connections on port {Port}.", config.Port);
return new RpcServerSocket(socket, config);
}
private RpcServerSocket(ServerSocket socket, RpcConfiguration config) : base(socket, config) {}
}

View File

@ -1,25 +0,0 @@
using NetMQ;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Sockets;
static class RpcSocket {
internal static void SetDefaultSocketOptions(ThreadSafeSocketOptions options) {
// TODO test behavior when either agent or server are offline for a very long time
options.DelayAttachOnConnect = true;
options.ReceiveHighWatermark = 10_000;
options.SendHighWatermark = 10_000;
}
}
public abstract class RpcSocket<TSocket> where TSocket : ThreadSafeSocket {
internal TSocket Socket { get; }
internal RpcConfiguration Config { get; }
internal MessageReplyTracker ReplyTracker { get; }
protected RpcSocket(TSocket socket, RpcConfiguration config) {
Socket = socket;
Config = config;
ReplyTracker = new MessageReplyTracker(config.RuntimeLogger);
}
}

View File

@ -1,23 +1,21 @@
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Components.Authorization; using Microsoft.AspNetCore.Components.Authorization;
using Microsoft.AspNetCore.Components.Server; using Microsoft.AspNetCore.Components.Server;
using Phantom.Common.Data.Web.Users; using Phantom.Common.Data.Web.Users;
using Phantom.Web.Services.Authentication; using Phantom.Web.Services.Authentication;
using Phantom.Web.Services.Authorization; using Phantom.Web.Services.Authorization;
using Phantom.Web.Services.Rpc;
namespace Phantom.Web.Services; namespace Phantom.Web.Services;
public static class PhantomWebServices { public static class PhantomWebServices {
public static void AddPhantomServices(this IServiceCollection services, CancellationToken cancellationToken) { public static void AddPhantomServices(this IServiceCollection services, CancellationToken cancellationToken) {
services.AddSingleton<MessageListener>(); services.AddAuthentication(CookieAuthenticationDefaults.AuthenticationScheme).AddCookie(ConfigureIdentityCookie);
services.AddSingleton<ControllerCommunication>(); services.AddAuthorization(ConfigureAuthorization);
services.AddSingleton<PermissionManager>();
services.AddSingleton(PhantomLoginStore.Create(cancellationToken)); services.AddSingleton(PhantomLoginStore.Create(cancellationToken));
services.AddScoped<PhantomLoginManager>(); services.AddScoped<PhantomLoginManager>();
services.AddAuthorization(ConfigureAuthorization);
services.AddScoped<IAuthorizationHandler, PermissionBasedPolicyHandler>(); services.AddScoped<IAuthorizationHandler, PermissionBasedPolicyHandler>();
services.AddScoped<AuthenticationStateProvider, ServerAuthenticationStateProvider>(); services.AddScoped<AuthenticationStateProvider, ServerAuthenticationStateProvider>();
} }
@ -28,6 +26,19 @@ public static class PhantomWebServices {
application.UseWhen(PhantomIdentityMiddleware.AcceptsPath, static app => app.UseMiddleware<PhantomIdentityMiddleware>()); application.UseWhen(PhantomIdentityMiddleware.AcceptsPath, static app => app.UseMiddleware<PhantomIdentityMiddleware>());
} }
private static void ConfigureIdentityCookie(CookieAuthenticationOptions o) {
o.Cookie.Name = "Phantom.Identity";
o.Cookie.HttpOnly = true;
o.Cookie.SameSite = SameSiteMode.Lax;
o.ExpireTimeSpan = TimeSpan.FromDays(30);
o.SlidingExpiration = true;
o.LoginPath = PhantomIdentityMiddleware.LoginPath;
o.LogoutPath = PhantomIdentityMiddleware.LogoutPath;
o.AccessDeniedPath = PhantomIdentityMiddleware.LoginPath;
}
private static void ConfigureAuthorization(AuthorizationOptions o) { private static void ConfigureAuthorization(AuthorizationOptions o) {
foreach (var permission in Permission.All) { foreach (var permission in Permission.All) {
o.AddPolicy(permission.Id, policy => policy.Requirements.Add(new PermissionBasedPolicyRequirement(permission))); o.AddPolicy(permission.Id, policy => policy.Requirements.Add(new PermissionBasedPolicyRequirement(permission)));

View File

@ -1,7 +1,7 @@
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
namespace Phantom.Web.Services.Rpc; namespace Phantom.Web.Services;
public sealed class ControllerCommunication { public sealed class ControllerCommunication {
private readonly RpcConnectionToServer<IMessageToControllerListener> connection; private readonly RpcConnectionToServer<IMessageToControllerListener> connection;

View File

@ -1,19 +0,0 @@
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Web.Services.Rpc;
public sealed class MessageListener : IMessageToWebListener {
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
public MessageListener(RpcConnectionToServer<IMessageToControllerListener> connection) {
this.connection = connection;
}
public Task<NoReply> HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -1,14 +1,22 @@
using Phantom.Common.Messages.Web; using NetMQ;
using NetMQ.Sockets;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional; using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Tasks;
using Serilog.Events;
using ILogger = Serilog.ILogger;
namespace Phantom.Web.Services.Rpc; namespace Phantom.Web.Services.Rpc;
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> { public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToWebListener, IMessageToControllerListener> {
public static Task Launch(RpcClientSocket<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToWebListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) { public static Task Launch(RpcConfiguration config, AuthToken authToken, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToWebListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch(); return new RpcClientRuntime(config, listenerFactory, new RegisterWebMessage(authToken), disconnectSemaphore, receiveCancellationToken).Launch();
} }
private RpcClientRuntime(RpcClientSocket<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToWebListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {} private RpcClientRuntime(RpcConfiguration config, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToWebListener> messageListenerFactory, RegisterWebMessage registerWebMessage, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, WebMessageRegistries.Definitions, messageListenerFactory, registerWebMessage, disconnectSemaphore, receiveCancellationToken) {}
} }

View File

@ -2,6 +2,6 @@
namespace Phantom.Web; namespace Phantom.Web;
sealed record Configuration(ILogger Logger, string Host, ushort Port, string BasePath, string DataProtectionKeyFolderPath, CancellationToken CancellationToken) { public sealed record Configuration(ILogger Logger, string Host, ushort Port, string BasePath, string DataProtectionKeyFolderPath, CancellationToken CancellationToken) {
public string HttpUrl => "http://" + Host + ":" + Port; public string HttpUrl => "http://" + Host + ":" + Port;
} }

View File

@ -1,6 +1,4 @@
using Microsoft.AspNetCore.DataProtection; using Microsoft.AspNetCore.DataProtection;
using Phantom.Common.Messages.Web;
using Phantom.Utils.Rpc;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Phantom.Web.Base; using Phantom.Web.Base;
using Phantom.Web.Services; using Phantom.Web.Services;
@ -8,9 +6,9 @@ using Serilog;
namespace Phantom.Web; namespace Phantom.Web;
static class WebLauncher { static class Launcher {
public static WebApplication CreateApplication(Configuration config, TaskManager taskManager, ServiceConfiguration serviceConfiguration, RpcConnectionToServer<IMessageToControllerListener> controllerConnection) { public static WebApplication CreateApplication(Configuration config, ServiceConfiguration serviceConfiguration, TaskManager taskManager) {
var assembly = typeof(WebLauncher).Assembly; var assembly = typeof(Launcher).Assembly;
var builder = WebApplication.CreateBuilder(new WebApplicationOptions { var builder = WebApplication.CreateBuilder(new WebApplicationOptions {
ApplicationName = assembly.GetName().Name, ApplicationName = assembly.GetName().Name,
ContentRootPath = Path.GetDirectoryName(assembly.Location) ContentRootPath = Path.GetDirectoryName(assembly.Location)
@ -25,9 +23,8 @@ static class WebLauncher {
builder.WebHost.UseStaticWebAssets(); builder.WebHost.UseStaticWebAssets();
} }
builder.Services.AddSingleton(taskManager);
builder.Services.AddSingleton(serviceConfiguration); builder.Services.AddSingleton(serviceConfiguration);
builder.Services.AddSingleton(controllerConnection); builder.Services.AddSingleton(taskManager);
builder.Services.AddPhantomServices(config.CancellationToken); builder.Services.AddPhantomServices(config.CancellationToken);
builder.Services.AddSingleton<IHostLifetime>(new NullLifetime()); builder.Services.AddSingleton<IHostLifetime>(new NullLifetime());

View File

@ -1,12 +1,8 @@
using System.Reflection; using System.Reflection;
using NetMQ;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Phantom.Web; using Phantom.Web;
@ -50,27 +46,22 @@ try {
PhantomLogger.Root.InformationHeading("Launching Phantom Panel web..."); PhantomLogger.Root.InformationHeading("Launching Phantom Panel web...");
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken));
var configuration = new Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
var administratorToken = TokenGenerator.Create(60);
var taskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Web")); var taskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Web"));
var serviceConfiguration = new ServiceConfiguration(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken), shutdownCancellationToken);
var webApplication = WebLauncher.CreateApplication(configuration, taskManager, serviceConfiguration, rpcSocket.Connection);
MessageListener messageListener;
await using (var scope = webApplication.Services.CreateAsyncScope()) {
messageListener = scope.ServiceProvider.GetRequiredService<MessageListener>();
}
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1); var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
var rpcTask = RpcClientRuntime.Launch(rpcSocket, messageListener, rpcDisconnectSemaphore, shutdownCancellationToken); var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
var rpcTask = RpcClientRuntime.Launch(rpcConfiguration, webToken, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken);
try { try {
var configuration = new Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
var administratorToken = TokenGenerator.Create(60);
PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken); PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken);
PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", configuration.HttpUrl, configuration.BasePath + "setup"); PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", configuration.HttpUrl, configuration.BasePath + "setup");
await WebLauncher.Launch(configuration, webApplication);
var serviceConfiguration = new ServiceConfiguration(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken), shutdownCancellationToken);
var webApplication = Launcher.CreateApplication(configuration, serviceConfiguration, taskManager);
await Launcher.Launch(configuration, webApplication);
} finally { } finally {
shutdownCancellationTokenSource.Cancel(); shutdownCancellationTokenSource.Cancel();
await taskManager.Stop(); await taskManager.Stop();
@ -78,8 +69,6 @@ try {
rpcDisconnectSemaphore.Release(); rpcDisconnectSemaphore.Release();
await rpcTask; await rpcTask;
rpcDisconnectSemaphore.Dispose(); rpcDisconnectSemaphore.Dispose();
NetMQConfig.Cleanup();
} }
return 0; return 0;

View File

@ -1,5 +1,6 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;