mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2024-11-25 16:42:54 +01:00
Compare commits
7 Commits
4ac60f61eb
...
15d45fe1a3
Author | SHA1 | Date | |
---|---|---|---|
15d45fe1a3 | |||
0b8a870d10 | |||
16888c9b10 | |||
bfefb9063b | |||
cd332a6571 | |||
2a9bb9e6ac | |||
55b853d227 |
@ -1,9 +1,10 @@
|
|||||||
<component name="ProjectRunConfigurationManager">
|
<component name="ProjectRunConfigurationManager">
|
||||||
<configuration default="false" name="Controller + Agent x3" type="CompoundRunConfigurationType">
|
<configuration default="false" name="Controller + Web + Agent x3" type="CompoundRunConfigurationType">
|
||||||
<toRun name="Agent 1" type="DotNetProject" />
|
<toRun name="Agent 1" type="DotNetProject" />
|
||||||
<toRun name="Agent 2" type="DotNetProject" />
|
<toRun name="Agent 2" type="DotNetProject" />
|
||||||
<toRun name="Agent 3" type="DotNetProject" />
|
<toRun name="Agent 3" type="DotNetProject" />
|
||||||
<toRun name="Controller" type="DotNetProject" />
|
<toRun name="Controller" type="DotNetProject" />
|
||||||
|
<toRun name="Web" type="DotNetProject" />
|
||||||
<method v="2" />
|
<method v="2" />
|
||||||
</configuration>
|
</configuration>
|
||||||
</component>
|
</component>
|
@ -1,7 +1,8 @@
|
|||||||
<component name="ProjectRunConfigurationManager">
|
<component name="ProjectRunConfigurationManager">
|
||||||
<configuration default="false" name="Controller + Agent" type="CompoundRunConfigurationType">
|
<configuration default="false" name="Controller + Web + Agent" type="CompoundRunConfigurationType">
|
||||||
<toRun name="Agent 1" type="DotNetProject" />
|
<toRun name="Agent 1" type="DotNetProject" />
|
||||||
<toRun name="Controller" type="DotNetProject" />
|
<toRun name="Controller" type="DotNetProject" />
|
||||||
|
<toRun name="Web" type="DotNetProject" />
|
||||||
<method v="2" />
|
<method v="2" />
|
||||||
</configuration>
|
</configuration>
|
||||||
</component>
|
</component>
|
@ -6,6 +6,8 @@
|
|||||||
<option name="PASS_PARENT_ENVS" value="1" />
|
<option name="PASS_PARENT_ENVS" value="1" />
|
||||||
<envs>
|
<envs>
|
||||||
<env name="ASPNETCORE_ENVIRONMENT" value="Development" />
|
<env name="ASPNETCORE_ENVIRONMENT" value="Development" />
|
||||||
|
<env name="CONTROLLER_HOST" value="localhost" />
|
||||||
|
<env name="WEB_KEY" value="BMNHM9RRPMCBBY29D9XHS6KBKZSRY7F5XFN27YZX96XXWJC2NM2D6YRHM9PZN9JGQGCSJ6FMB2GGZ" />
|
||||||
<env name="WEB_SERVER_HOST" value="localhost" />
|
<env name="WEB_SERVER_HOST" value="localhost" />
|
||||||
</envs>
|
</envs>
|
||||||
<option name="USE_EXTERNAL_CONSOLE" value="0" />
|
<option name="USE_EXTERNAL_CONSOLE" value="0" />
|
||||||
|
25
Agent/Phantom.Agent.Rpc/ControllerConnection.cs
Normal file
25
Agent/Phantom.Agent.Rpc/ControllerConnection.cs
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
using Phantom.Common.Logging;
|
||||||
|
using Phantom.Common.Messages.Agent;
|
||||||
|
using Phantom.Utils.Rpc;
|
||||||
|
using Serilog;
|
||||||
|
|
||||||
|
namespace Phantom.Agent.Rpc;
|
||||||
|
|
||||||
|
public sealed class ControllerConnection {
|
||||||
|
private static readonly ILogger Logger = PhantomLogger.Create(nameof(ControllerConnection));
|
||||||
|
|
||||||
|
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
|
||||||
|
|
||||||
|
public ControllerConnection(RpcConnectionToServer<IMessageToControllerListener> connection) {
|
||||||
|
this.connection = connection;
|
||||||
|
Logger.Information("Connection ready.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
|
||||||
|
return connection.Send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> where TReply : class {
|
||||||
|
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
|
||||||
|
}
|
||||||
|
}
|
@ -2,42 +2,37 @@
|
|||||||
using NetMQ.Sockets;
|
using NetMQ.Sockets;
|
||||||
using Phantom.Common.Data.Agent;
|
using Phantom.Common.Data.Agent;
|
||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
|
using Phantom.Common.Messages.Agent.BiDirectional;
|
||||||
using Phantom.Common.Messages.Agent.ToController;
|
using Phantom.Common.Messages.Agent.ToController;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc;
|
||||||
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Agent.Rpc;
|
namespace Phantom.Agent.Rpc;
|
||||||
|
|
||||||
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, RegisterAgentMessage> {
|
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
|
||||||
public static Task Launch(RpcConfiguration config, AuthToken authToken, AgentInfo agentInfo, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
|
public static Task Launch(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, AgentInfo agentInfo, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
|
||||||
return new RpcClientRuntime(config, agentInfo.Guid, listenerFactory, new RegisterAgentMessage(authToken, agentInfo), disconnectSemaphore, receiveCancellationToken).Launch();
|
return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch();
|
||||||
}
|
}
|
||||||
|
|
||||||
private readonly RpcConfiguration config;
|
private RpcClientRuntime(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
|
||||||
private readonly Guid agentGuid;
|
|
||||||
|
|
||||||
private RpcClientRuntime(RpcConfiguration config, Guid agentGuid, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToAgentListener> messageListenerFactory, RegisterAgentMessage registerAgentMessage, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, AgentMessageRegistries.Definitions, messageListenerFactory, registerAgentMessage, disconnectSemaphore, receiveCancellationToken) {
|
protected override void RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection, ILogger logger, TaskManager taskManager) {
|
||||||
this.config = config;
|
|
||||||
this.agentGuid = agentGuid;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override void RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection, TaskManager taskManager) {
|
|
||||||
ServerMessaging.SetCurrentConnection(connection);
|
|
||||||
|
|
||||||
var keepAliveLoop = new KeepAliveLoop(connection);
|
var keepAliveLoop = new KeepAliveLoop(connection);
|
||||||
try {
|
try {
|
||||||
base.RunWithConnection(socket, connection, taskManager);
|
base.RunWithConnection(socket, connection, logger, taskManager);
|
||||||
} finally {
|
} finally {
|
||||||
keepAliveLoop.Cancel();
|
keepAliveLoop.Cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override async Task Disconnect(ClientSocket socket) {
|
protected override async Task Disconnect(ClientSocket socket, ILogger logger) {
|
||||||
var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage(agentGuid)).ToArray();
|
var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage()).ToArray();
|
||||||
try {
|
try {
|
||||||
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
|
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
|
||||||
} catch (TimeoutException) {
|
} catch (TimeoutException) {
|
||||||
config.RuntimeLogger.Error("Timed out communicating agent shutdown with the controller.");
|
logger.Error("Timed out communicating agent shutdown with the controller.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,35 +0,0 @@
|
|||||||
using Phantom.Common.Logging;
|
|
||||||
using Phantom.Common.Messages.Agent;
|
|
||||||
using Phantom.Utils.Rpc;
|
|
||||||
using Serilog;
|
|
||||||
|
|
||||||
namespace Phantom.Agent.Rpc;
|
|
||||||
|
|
||||||
public static class ServerMessaging {
|
|
||||||
private static readonly ILogger Logger = PhantomLogger.Create(nameof(ServerMessaging));
|
|
||||||
|
|
||||||
private static RpcConnectionToServer<IMessageToControllerListener>? CurrentConnection { get; set; }
|
|
||||||
private static RpcConnectionToServer<IMessageToControllerListener> CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready.");
|
|
||||||
|
|
||||||
private static readonly object SetCurrentConnectionLock = new ();
|
|
||||||
|
|
||||||
internal static void SetCurrentConnection(RpcConnectionToServer<IMessageToControllerListener> connection) {
|
|
||||||
lock (SetCurrentConnectionLock) {
|
|
||||||
if (CurrentConnection != null) {
|
|
||||||
throw new InvalidOperationException("Server connection can only be set once.");
|
|
||||||
}
|
|
||||||
|
|
||||||
CurrentConnection = connection;
|
|
||||||
}
|
|
||||||
|
|
||||||
Logger.Information("Server connection ready.");
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
|
|
||||||
return CurrentConnectionOrThrow.Send(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> where TReply : class {
|
|
||||||
return CurrentConnectionOrThrow.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,4 +1,5 @@
|
|||||||
using Phantom.Agent.Minecraft.Java;
|
using Phantom.Agent.Minecraft.Java;
|
||||||
|
using Phantom.Agent.Rpc;
|
||||||
using Phantom.Agent.Services.Backups;
|
using Phantom.Agent.Services.Backups;
|
||||||
using Phantom.Agent.Services.Instances;
|
using Phantom.Agent.Services.Instances;
|
||||||
using Phantom.Common.Data.Agent;
|
using Phantom.Common.Data.Agent;
|
||||||
@ -18,12 +19,12 @@ public sealed class AgentServices {
|
|||||||
internal JavaRuntimeRepository JavaRuntimeRepository { get; }
|
internal JavaRuntimeRepository JavaRuntimeRepository { get; }
|
||||||
internal InstanceSessionManager InstanceSessionManager { get; }
|
internal InstanceSessionManager InstanceSessionManager { get; }
|
||||||
|
|
||||||
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration) {
|
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration, ControllerConnection controllerConnection) {
|
||||||
this.AgentFolders = agentFolders;
|
this.AgentFolders = agentFolders;
|
||||||
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>());
|
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>());
|
||||||
this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks);
|
this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks);
|
||||||
this.JavaRuntimeRepository = new JavaRuntimeRepository();
|
this.JavaRuntimeRepository = new JavaRuntimeRepository();
|
||||||
this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager, BackupManager);
|
this.InstanceSessionManager = new InstanceSessionManager(controllerConnection, agentInfo, agentFolders, JavaRuntimeRepository, TaskManager, BackupManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Initialize() {
|
public async Task Initialize() {
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
using Phantom.Agent.Minecraft.Launcher;
|
using Phantom.Agent.Minecraft.Launcher;
|
||||||
using Phantom.Agent.Rpc;
|
|
||||||
using Phantom.Agent.Services.Instances.Procedures;
|
using Phantom.Agent.Services.Instances.Procedures;
|
||||||
using Phantom.Agent.Services.Instances.States;
|
using Phantom.Agent.Services.Instances.States;
|
||||||
using Phantom.Common.Data.Instance;
|
using Phantom.Common.Data.Instance;
|
||||||
@ -57,7 +56,7 @@ sealed class Instance : IAsyncDisposable {
|
|||||||
|
|
||||||
public void ReportLastStatus() {
|
public void ReportLastStatus() {
|
||||||
TryUpdateStatus("Report last status of instance " + shortName, async () => {
|
TryUpdateStatus("Report last status of instance " + shortName, async () => {
|
||||||
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
|
await Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -65,14 +64,14 @@ sealed class Instance : IAsyncDisposable {
|
|||||||
TryUpdateStatus("Report status of instance " + shortName + " as " + status.GetType().Name, async () => {
|
TryUpdateStatus("Report status of instance " + shortName + " as " + status.GetType().Name, async () => {
|
||||||
if (status != currentStatus) {
|
if (status != currentStatus) {
|
||||||
currentStatus = status;
|
currentStatus = status;
|
||||||
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
|
await Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ReportEvent(IInstanceEvent instanceEvent) {
|
private void ReportEvent(IInstanceEvent instanceEvent) {
|
||||||
var message = new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent);
|
var message = new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent);
|
||||||
Services.TaskManager.Run("Report event for instance " + shortName, async () => await ServerMessaging.Send(message));
|
Services.TaskManager.Run("Report event for instance " + shortName, async () => await Services.ControllerConnection.Send(message));
|
||||||
}
|
}
|
||||||
|
|
||||||
internal void TransitionState(IInstanceState newState) {
|
internal void TransitionState(IInstanceState newState) {
|
||||||
|
@ -16,12 +16,14 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
|
|||||||
|
|
||||||
private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200);
|
private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200);
|
||||||
|
|
||||||
|
private readonly ControllerConnection controllerConnection;
|
||||||
private readonly Guid instanceGuid;
|
private readonly Guid instanceGuid;
|
||||||
private readonly Channel<string> outputChannel;
|
private readonly Channel<string> outputChannel;
|
||||||
|
|
||||||
private int droppedLinesSinceLastSend;
|
private int droppedLinesSinceLastSend;
|
||||||
|
|
||||||
public InstanceLogSender(TaskManager taskManager, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName), taskManager, "Instance log sender for " + loggerName) {
|
public InstanceLogSender(ControllerConnection controllerConnection, TaskManager taskManager, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName), taskManager, "Instance log sender for " + loggerName) {
|
||||||
|
this.controllerConnection = controllerConnection;
|
||||||
this.instanceGuid = instanceGuid;
|
this.instanceGuid = instanceGuid;
|
||||||
this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped);
|
this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped);
|
||||||
Start();
|
Start();
|
||||||
@ -61,7 +63,7 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
|
|||||||
|
|
||||||
private async Task SendOutputToServer(ImmutableArray<string> lines) {
|
private async Task SendOutputToServer(ImmutableArray<string> lines) {
|
||||||
if (!lines.IsEmpty) {
|
if (!lines.IsEmpty) {
|
||||||
await ServerMessaging.Send(new InstanceOutputMessage(instanceGuid, lines));
|
await controllerConnection.Send(new InstanceOutputMessage(instanceGuid, lines));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
using Phantom.Agent.Minecraft.Launcher;
|
using Phantom.Agent.Minecraft.Launcher;
|
||||||
|
using Phantom.Agent.Rpc;
|
||||||
using Phantom.Agent.Services.Backups;
|
using Phantom.Agent.Services.Backups;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
|
|
||||||
namespace Phantom.Agent.Services.Instances;
|
namespace Phantom.Agent.Services.Instances;
|
||||||
|
|
||||||
sealed record InstanceServices(TaskManager TaskManager, PortManager PortManager, BackupManager BackupManager, LaunchServices LaunchServices);
|
sealed record InstanceServices(ControllerConnection ControllerConnection, TaskManager TaskManager, PortManager PortManager, BackupManager BackupManager, LaunchServices LaunchServices);
|
||||||
|
@ -24,6 +24,7 @@ namespace Phantom.Agent.Services.Instances;
|
|||||||
sealed class InstanceSessionManager : IAsyncDisposable {
|
sealed class InstanceSessionManager : IAsyncDisposable {
|
||||||
private static readonly ILogger Logger = PhantomLogger.Create<InstanceSessionManager>();
|
private static readonly ILogger Logger = PhantomLogger.Create<InstanceSessionManager>();
|
||||||
|
|
||||||
|
private readonly ControllerConnection controllerConnection;
|
||||||
private readonly AgentInfo agentInfo;
|
private readonly AgentInfo agentInfo;
|
||||||
private readonly string basePath;
|
private readonly string basePath;
|
||||||
|
|
||||||
@ -36,7 +37,8 @@ sealed class InstanceSessionManager : IAsyncDisposable {
|
|||||||
|
|
||||||
private uint instanceLoggerSequenceId = 0;
|
private uint instanceLoggerSequenceId = 0;
|
||||||
|
|
||||||
public InstanceSessionManager(AgentInfo agentInfo, AgentFolders agentFolders, JavaRuntimeRepository javaRuntimeRepository, TaskManager taskManager, BackupManager backupManager) {
|
public InstanceSessionManager(ControllerConnection controllerConnection, AgentInfo agentInfo, AgentFolders agentFolders, JavaRuntimeRepository javaRuntimeRepository, TaskManager taskManager, BackupManager backupManager) {
|
||||||
|
this.controllerConnection = controllerConnection;
|
||||||
this.agentInfo = agentInfo;
|
this.agentInfo = agentInfo;
|
||||||
this.basePath = agentFolders.InstancesFolderPath;
|
this.basePath = agentFolders.InstancesFolderPath;
|
||||||
this.shutdownCancellationToken = shutdownCancellationTokenSource.Token;
|
this.shutdownCancellationToken = shutdownCancellationTokenSource.Token;
|
||||||
@ -45,7 +47,7 @@ sealed class InstanceSessionManager : IAsyncDisposable {
|
|||||||
var launchServices = new LaunchServices(minecraftServerExecutables, javaRuntimeRepository);
|
var launchServices = new LaunchServices(minecraftServerExecutables, javaRuntimeRepository);
|
||||||
var portManager = new PortManager(agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
|
var portManager = new PortManager(agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
|
||||||
|
|
||||||
this.instanceServices = new InstanceServices(taskManager, portManager, backupManager, launchServices);
|
this.instanceServices = new InstanceServices(controllerConnection, taskManager, portManager, backupManager, launchServices);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<InstanceActionResult<T>> AcquireSemaphoreAndRun<T>(Func<Task<InstanceActionResult<T>>> func) {
|
private async Task<InstanceActionResult<T>> AcquireSemaphoreAndRun<T>(Func<Task<InstanceActionResult<T>>> func) {
|
||||||
@ -146,7 +148,7 @@ sealed class InstanceSessionManager : IAsyncDisposable {
|
|||||||
var runningInstances = GetRunningInstancesInternal();
|
var runningInstances = GetRunningInstancesInternal();
|
||||||
var runningInstanceCount = runningInstances.Length;
|
var runningInstanceCount = runningInstances.Length;
|
||||||
var runningInstanceMemory = runningInstances.Aggregate(RamAllocationUnits.Zero, static (total, instance) => total + instance.Configuration.MemoryAllocation);
|
var runningInstanceMemory = runningInstances.Aggregate(RamAllocationUnits.Zero, static (total, instance) => total + instance.Configuration.MemoryAllocation);
|
||||||
await ServerMessaging.Send(new ReportAgentStatusMessage(runningInstanceCount, runningInstanceMemory));
|
await controllerConnection.Send(new ReportAgentStatusMessage(runningInstanceCount, runningInstanceMemory));
|
||||||
} finally {
|
} finally {
|
||||||
semaphore.Release();
|
semaphore.Release();
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
|
|||||||
this.context = context;
|
this.context = context;
|
||||||
this.Process = process;
|
this.Process = process;
|
||||||
|
|
||||||
this.logSender = new InstanceLogSender(context.Services.TaskManager, configuration.InstanceGuid, context.ShortName);
|
this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.Services.TaskManager, configuration.InstanceGuid, context.ShortName);
|
||||||
|
|
||||||
this.backupScheduler = new BackupScheduler(context.Services.TaskManager, context.Services.BackupManager, process, context, configuration.ServerPort);
|
this.backupScheduler = new BackupScheduler(context.Services.TaskManager, context.Services.BackupManager, process, context, configuration.ServerPort);
|
||||||
this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted;
|
this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted;
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
using Phantom.Agent.Rpc;
|
using Phantom.Common.Data.Instance;
|
||||||
using Phantom.Common.Data.Instance;
|
|
||||||
using Phantom.Common.Data.Replies;
|
using Phantom.Common.Data.Replies;
|
||||||
using Phantom.Common.Logging;
|
using Phantom.Common.Logging;
|
||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
using NetMQ;
|
using NetMQ;
|
||||||
using Phantom.Common.Data;
|
using Phantom.Common.Data;
|
||||||
using Phantom.Common.Data.Agent;
|
|
||||||
using Phantom.Common.Logging;
|
using Phantom.Common.Logging;
|
||||||
using Phantom.Utils.Cryptography;
|
using Phantom.Utils.Cryptography;
|
||||||
using Phantom.Utils.IO;
|
using Phantom.Utils.IO;
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
|
using NetMQ;
|
||||||
using Phantom.Agent;
|
using Phantom.Agent;
|
||||||
using Phantom.Agent.Rpc;
|
using Phantom.Agent.Rpc;
|
||||||
using Phantom.Agent.Services;
|
using Phantom.Agent.Services;
|
||||||
@ -6,7 +7,9 @@ using Phantom.Agent.Services.Rpc;
|
|||||||
using Phantom.Common.Data.Agent;
|
using Phantom.Common.Data.Agent;
|
||||||
using Phantom.Common.Logging;
|
using Phantom.Common.Logging;
|
||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
|
using Phantom.Common.Messages.Agent.ToController;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc;
|
||||||
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using Phantom.Utils.Runtime;
|
using Phantom.Utils.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
|
|
||||||
@ -46,19 +49,18 @@ try {
|
|||||||
|
|
||||||
var (controllerCertificate, agentToken) = agentKey.Value;
|
var (controllerCertificate, agentToken) = agentKey.Value;
|
||||||
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
|
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
|
||||||
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks));
|
|
||||||
|
|
||||||
MessageListener MessageListenerFactory(RpcConnectionToServer<IMessageToControllerListener> connection) {
|
|
||||||
return new MessageListener(connection, agentServices, shutdownCancellationTokenSource);
|
|
||||||
}
|
|
||||||
|
|
||||||
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
|
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
|
||||||
|
|
||||||
|
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
|
||||||
|
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
|
||||||
|
|
||||||
|
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcSocket.Connection));
|
||||||
await agentServices.Initialize();
|
await agentServices.Initialize();
|
||||||
|
|
||||||
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
|
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
|
||||||
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
|
var rpcMessageListener = new MessageListener(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource);
|
||||||
var rpcTask = RpcClientRuntime.Launch(rpcConfiguration, agentToken, agentInfo, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken);
|
var rpcTask = RpcClientRuntime.Launch(rpcSocket, agentInfo, rpcMessageListener, rpcDisconnectSemaphore, shutdownCancellationToken);
|
||||||
try {
|
try {
|
||||||
await rpcTask.WaitAsync(shutdownCancellationToken);
|
await rpcTask.WaitAsync(shutdownCancellationToken);
|
||||||
} finally {
|
} finally {
|
||||||
@ -68,6 +70,8 @@ try {
|
|||||||
rpcDisconnectSemaphore.Release();
|
rpcDisconnectSemaphore.Release();
|
||||||
await rpcTask;
|
await rpcTask;
|
||||||
rpcDisconnectSemaphore.Dispose();
|
rpcDisconnectSemaphore.Dispose();
|
||||||
|
|
||||||
|
NetMQConfig.Cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -11,7 +11,7 @@ public static class AgentMessageRegistries {
|
|||||||
public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent)));
|
public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent)));
|
||||||
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
|
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
|
||||||
|
|
||||||
public static IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener> Definitions { get; } = new MessageDefinitions();
|
public static IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> Definitions { get; } = new MessageDefinitions();
|
||||||
|
|
||||||
static AgentMessageRegistries() {
|
static AgentMessageRegistries() {
|
||||||
ToAgent.Add<RegisterAgentSuccessMessage>(0);
|
ToAgent.Add<RegisterAgentSuccessMessage>(0);
|
||||||
@ -33,7 +33,7 @@ public static class AgentMessageRegistries {
|
|||||||
ToController.Add<ReplyMessage>(127);
|
ToController.Add<ReplyMessage>(127);
|
||||||
}
|
}
|
||||||
|
|
||||||
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener> {
|
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
|
||||||
public MessageRegistry<IMessageToAgentListener> ToClient => ToAgent;
|
public MessageRegistry<IMessageToAgentListener> ToClient => ToAgent;
|
||||||
public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
|
public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
|
||||||
|
|
||||||
@ -41,11 +41,7 @@ public static class AgentMessageRegistries {
|
|||||||
return messageType == typeof(RegisterAgentMessage);
|
return messageType == typeof(RegisterAgentMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
public IMessage<IMessageToAgentListener, NoReply> CreateReplyToServerMessage( uint sequenceId, byte[] serializedReply) {
|
public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) {
|
||||||
return new ReplyMessage(sequenceId, serializedReply);
|
|
||||||
}
|
|
||||||
|
|
||||||
public IMessage<IMessageToControllerListener, NoReply> CreateReplyToClientMessage(uint sequenceId, byte[] serializedReply) {
|
|
||||||
return new ReplyMessage(sequenceId, serializedReply);
|
return new ReplyMessage(sequenceId, serializedReply);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,9 +4,7 @@ using Phantom.Utils.Rpc.Message;
|
|||||||
namespace Phantom.Common.Messages.Agent.ToController;
|
namespace Phantom.Common.Messages.Agent.ToController;
|
||||||
|
|
||||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||||
public sealed partial record UnregisterAgentMessage(
|
public sealed partial record UnregisterAgentMessage : IMessageToController {
|
||||||
[property: MemoryPackOrder(0)] Guid AgentGuid
|
|
||||||
) : IMessageToController {
|
|
||||||
public Task<NoReply> Accept(IMessageToControllerListener listener) {
|
public Task<NoReply> Accept(IMessageToControllerListener listener) {
|
||||||
return listener.HandleUnregisterAgent(this);
|
return listener.HandleUnregisterAgent(this);
|
||||||
}
|
}
|
||||||
|
@ -10,28 +10,25 @@ public static class WebMessageRegistries {
|
|||||||
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
|
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
|
||||||
public static MessageRegistry<IMessageToWebListener> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb)));
|
public static MessageRegistry<IMessageToWebListener> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb)));
|
||||||
|
|
||||||
public static IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener> Definitions { get; } = new MessageDefinitions();
|
public static IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> Definitions { get; } = new MessageDefinitions();
|
||||||
|
|
||||||
static WebMessageRegistries() {
|
static WebMessageRegistries() {
|
||||||
|
ToController.Add<RegisterWebMessage>(0);
|
||||||
ToController.Add<CreateOrUpdateAdministratorUser, CreateOrUpdateAdministratorUserResult>(1);
|
ToController.Add<CreateOrUpdateAdministratorUser, CreateOrUpdateAdministratorUserResult>(1);
|
||||||
ToController.Add<ReplyMessage>(127);
|
ToController.Add<ReplyMessage>(127);
|
||||||
|
|
||||||
ToWeb.Add<ReplyMessage>(127);
|
ToWeb.Add<ReplyMessage>(127);
|
||||||
}
|
}
|
||||||
|
|
||||||
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener> {
|
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> {
|
||||||
public MessageRegistry<IMessageToWebListener> ToClient => ToWeb;
|
public MessageRegistry<IMessageToWebListener> ToClient => ToWeb;
|
||||||
public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
|
public MessageRegistry<IMessageToControllerListener> ToServer => ToController;
|
||||||
|
|
||||||
public bool IsRegistrationMessage(Type messageType) {
|
public bool IsRegistrationMessage(Type messageType) {
|
||||||
return false;
|
return messageType == typeof(RegisterWebMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
public IMessage<IMessageToWebListener, NoReply> CreateReplyToServerMessage( uint sequenceId, byte[] serializedReply) {
|
public ReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply) {
|
||||||
return new ReplyMessage(sequenceId, serializedReply);
|
|
||||||
}
|
|
||||||
|
|
||||||
public IMessage<IMessageToControllerListener, NoReply> CreateReplyToClientMessage(uint sequenceId, byte[] serializedReply) {
|
|
||||||
return new ReplyMessage(sequenceId, serializedReply);
|
return new ReplyMessage(sequenceId, serializedReply);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,13 @@ public sealed class RpcConnectionToClient<TListener> {
|
|||||||
private readonly MessageRegistry<TListener> messageRegistry;
|
private readonly MessageRegistry<TListener> messageRegistry;
|
||||||
private readonly MessageReplyTracker messageReplyTracker;
|
private readonly MessageReplyTracker messageReplyTracker;
|
||||||
|
|
||||||
|
private volatile bool isAuthorized;
|
||||||
|
|
||||||
|
public bool IsAuthorized {
|
||||||
|
get => isAuthorized;
|
||||||
|
set => isAuthorized = value;
|
||||||
|
}
|
||||||
|
|
||||||
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
|
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
|
||||||
private bool isClosed;
|
private bool isClosed;
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
using NetMQ.Sockets;
|
using NetMQ.Sockets;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc;
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
using Serilog.Events;
|
using Serilog.Events;
|
||||||
@ -8,49 +9,28 @@ using Serilog.Events;
|
|||||||
namespace Phantom.Controller.Rpc;
|
namespace Phantom.Controller.Rpc;
|
||||||
|
|
||||||
public static class RpcRuntime {
|
public static class RpcRuntime {
|
||||||
public static Task Launch<TClientListener, TServerListener>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
|
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
return RpcRuntime<TClientListener, TServerListener>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
|
return RpcRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internal sealed class RpcRuntime<TClientListener, TServerListener> : RpcRuntime<ServerSocket> {
|
internal sealed class RpcRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
|
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
|
||||||
return new RpcRuntime<TClientListener, TServerListener>(config, messageDefinitions, listenerFactory, cancellationToken).Launch();
|
var socket = RpcServerSocket.Connect(config);
|
||||||
|
return new RpcRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ServerSocket CreateSocket(RpcConfiguration config) {
|
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||||
var socket = new ServerSocket();
|
|
||||||
var options = socket.Options;
|
|
||||||
|
|
||||||
options.CurveServer = true;
|
|
||||||
options.CurveCertificate = config.ServerCertificate;
|
|
||||||
|
|
||||||
return socket;
|
|
||||||
}
|
|
||||||
|
|
||||||
private readonly RpcConfiguration config;
|
|
||||||
private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions;
|
|
||||||
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
|
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
|
||||||
private readonly CancellationToken cancellationToken;
|
private readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
private RpcRuntime(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(config, CreateSocket(config)) {
|
private RpcRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
|
||||||
this.config = config;
|
|
||||||
this.messageDefinitions = messageDefinitions;
|
this.messageDefinitions = messageDefinitions;
|
||||||
this.listenerFactory = listenerFactory;
|
this.listenerFactory = listenerFactory;
|
||||||
this.cancellationToken = cancellationToken;
|
this.cancellationToken = cancellationToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override void Connect(ServerSocket socket) {
|
protected override void Run(ServerSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager) {
|
||||||
var logger = config.RuntimeLogger;
|
|
||||||
var url = config.TcpUrl;
|
|
||||||
|
|
||||||
logger.Information("Starting ZeroMQ server on {Url}...", url);
|
|
||||||
socket.Bind(url);
|
|
||||||
logger.Information("ZeroMQ server initialized, listening for connections on port {Port}.", config.Port);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
|
|
||||||
var logger = config.RuntimeLogger;
|
|
||||||
var clients = new Dictionary<ulong, Client>();
|
var clients = new Dictionary<ulong, Client>();
|
||||||
|
|
||||||
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
||||||
@ -62,12 +42,14 @@ internal sealed class RpcRuntime<TClientListener, TServerListener> : RpcRuntime<
|
|||||||
var (routingId, data) = socket.Receive(cancellationToken);
|
var (routingId, data) = socket.Receive(cancellationToken);
|
||||||
|
|
||||||
if (data.Length == 0) {
|
if (data.Length == 0) {
|
||||||
LogMessageType(logger, routingId, data);
|
LogMessageType(logger, routingId, data, messageType: null);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
|
||||||
|
|
||||||
if (!clients.TryGetValue(routingId, out var client)) {
|
if (!clients.TryGetValue(routingId, out var client)) {
|
||||||
if (!CheckIsRegistrationMessage(data, logger, routingId)) {
|
if (!CheckIsRegistrationMessage(messageType, logger, routingId)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,7 +60,11 @@ internal sealed class RpcRuntime<TClientListener, TServerListener> : RpcRuntime<
|
|||||||
clients[routingId] = client;
|
clients[routingId] = client;
|
||||||
}
|
}
|
||||||
|
|
||||||
LogMessageType(logger, routingId, data);
|
if (!client.Connection.IsAuthorized && !CheckIsRegistrationMessage(messageType, logger, routingId)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
LogMessageType(logger, routingId, data, messageType);
|
||||||
messageDefinitions.ToServer.Handle(data, client);
|
messageDefinitions.ToServer.Handle(data, client);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,40 +73,40 @@ internal sealed class RpcRuntime<TClientListener, TServerListener> : RpcRuntime<
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void LogMessageType(ILogger logger, uint routingId, ReadOnlyMemory<byte> data) {
|
private void LogMessageType(ILogger logger, uint routingId, ReadOnlyMemory<byte> data, Type? messageType) {
|
||||||
if (!logger.IsEnabled(LogEventLevel.Verbose)) {
|
if (!logger.IsEnabled(LogEventLevel.Verbose)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data.Length > 0 && messageDefinitions.ToServer.TryGetType(data, out var type)) {
|
if (data.Length > 0 && messageType != null) {
|
||||||
logger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", type.Name, data.Length, routingId);
|
logger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", messageType.Name, data.Length, routingId);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
logger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId);
|
logger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private bool CheckIsRegistrationMessage(ReadOnlyMemory<byte> data, ILogger logger, uint routingId) {
|
private bool CheckIsRegistrationMessage(Type? messageType, ILogger logger, uint routingId) {
|
||||||
if (messageDefinitions.ToServer.TryGetType(data, out var type) && messageDefinitions.IsRegistrationMessage(type)) {
|
if (messageType != null && messageDefinitions.IsRegistrationMessage(messageType)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Warning("Received {MessageType} from {RoutingId} who is not registered.", type?.Name ?? "unknown message", routingId);
|
logger.Warning("Received {MessageType} from {RoutingId} who is not registered.", messageType?.Name ?? "unknown message", routingId);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private sealed class Client : MessageHandler<TServerListener> {
|
private sealed class Client : MessageHandler<TServerListener> {
|
||||||
public RpcConnectionToClient<TClientListener> Connection { get; }
|
public RpcConnectionToClient<TClientListener> Connection { get; }
|
||||||
|
|
||||||
private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions;
|
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||||
|
|
||||||
public Client(RpcConnectionToClient<TClientListener> connection, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, TServerListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
|
public Client(RpcConnectionToClient<TClientListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
|
||||||
this.Connection = connection;
|
this.Connection = connection;
|
||||||
this.messageDefinitions = messageDefinitions;
|
this.messageDefinitions = messageDefinitions;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
||||||
return Connection.Send(messageDefinitions.CreateReplyToServerMessage(sequenceId, serializedReply));
|
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,8 +39,8 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
|
|||||||
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
|
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
|
||||||
}
|
}
|
||||||
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) {
|
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) {
|
||||||
var guid = message.AgentInfo.Guid;
|
connection.IsAuthorized = true;
|
||||||
agentGuidWaiter.SetResult(guid);
|
agentGuidWaiter.SetResult(message.AgentInfo.Guid);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NoReply.Instance;
|
return NoReply.Instance;
|
||||||
@ -51,8 +51,11 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
|
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
|
||||||
if (agentManager.UnregisterAgent(message.AgentGuid, connection)) {
|
if (agentGuidWaiter.Task.IsCompleted) {
|
||||||
instanceManager.SetInstanceStatesForAgent(message.AgentGuid, InstanceStatus.Offline);
|
var agentGuid = agentGuidWaiter.Task.Result;
|
||||||
|
if (agentManager.UnregisterAgent(agentGuid, connection)) {
|
||||||
|
instanceManager.SetInstanceStatesForAgent(agentGuid, InstanceStatus.Offline);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
connection.Close();
|
connection.Close();
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
using NetMQ;
|
using NetMQ;
|
||||||
using Phantom.Common.Data;
|
using Phantom.Common.Data;
|
||||||
using Phantom.Common.Data.Agent;
|
|
||||||
|
|
||||||
namespace Phantom.Controller;
|
namespace Phantom.Controller;
|
||||||
|
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
using NetMQ;
|
using NetMQ;
|
||||||
using Phantom.Common.Data;
|
using Phantom.Common.Data;
|
||||||
using Phantom.Common.Data.Agent;
|
|
||||||
using Phantom.Common.Logging;
|
using Phantom.Common.Logging;
|
||||||
using Phantom.Utils.Cryptography;
|
using Phantom.Utils.Cryptography;
|
||||||
using Phantom.Utils.IO;
|
using Phantom.Utils.IO;
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
|
using NetMQ;
|
||||||
using Phantom.Common.Logging;
|
using Phantom.Common.Logging;
|
||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
using Phantom.Common.Messages.Web;
|
using Phantom.Common.Messages.Web;
|
||||||
@ -61,10 +62,14 @@ try {
|
|||||||
return new RpcConfiguration(PhantomLogger.Create("Rpc", serviceName), PhantomLogger.Create<TaskManager>("Rpc", serviceName), host, port, connectionKey.Certificate);
|
return new RpcConfiguration(PhantomLogger.Create("Rpc", serviceName), PhantomLogger.Create<TaskManager>("Rpc", serviceName), host, port, connectionKey.Certificate);
|
||||||
}
|
}
|
||||||
|
|
||||||
await Task.WhenAll(
|
try {
|
||||||
RpcRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
|
await Task.WhenAll(
|
||||||
RpcRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
|
RpcRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
|
||||||
);
|
RpcRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
NetMQConfig.Cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
|
@ -149,8 +149,8 @@ The repository includes a [Rider](https://www.jetbrains.com/rider/) projects wit
|
|||||||
- `Controller` starts the Controller.
|
- `Controller` starts the Controller.
|
||||||
- `Web` starts the Web server.
|
- `Web` starts the Web server.
|
||||||
- `Agent 1`, `Agent 2`, `Agent 3` start one of the Agents.
|
- `Agent 1`, `Agent 2`, `Agent 3` start one of the Agents.
|
||||||
- `Controller + Agent` starts the Controller and Agent 1.
|
- `Controller + Web + Agent` starts the Controller and Agent 1.
|
||||||
- `Controller + Agent x3` starts the Controller and Agent 1, 2, and 3.
|
- `Controller + Web + Agent x3` starts the Controller and Agent 1, 2, and 3.
|
||||||
|
|
||||||
## Bootstrap
|
## Bootstrap
|
||||||
|
|
||||||
|
@ -1,10 +1,9 @@
|
|||||||
namespace Phantom.Utils.Rpc.Message;
|
namespace Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
public interface IMessageDefinitions<TClientListener, TServerListener> {
|
public interface IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
MessageRegistry<TClientListener> ToClient { get; }
|
MessageRegistry<TClientListener> ToClient { get; }
|
||||||
MessageRegistry<TServerListener> ToServer { get; }
|
MessageRegistry<TServerListener> ToServer { get; }
|
||||||
|
|
||||||
bool IsRegistrationMessage(Type messageType);
|
bool IsRegistrationMessage(Type messageType);
|
||||||
IMessage<TClientListener, NoReply> CreateReplyToServerMessage(uint sequenceId, byte[] serializedReply);
|
TReplyMessage CreateReplyMessage(uint sequenceId, byte[] serializedReply);
|
||||||
IMessage<TServerListener, NoReply> CreateReplyToClientMessage(uint sequenceId, byte[] serializedReply);
|
|
||||||
}
|
}
|
||||||
|
@ -4,14 +4,13 @@ using Serilog;
|
|||||||
namespace Phantom.Utils.Rpc.Message;
|
namespace Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
public abstract class MessageHandler<TListener> {
|
public abstract class MessageHandler<TListener> {
|
||||||
protected TListener Listener { get; }
|
private readonly TListener listener;
|
||||||
|
|
||||||
private readonly ILogger logger;
|
private readonly ILogger logger;
|
||||||
private readonly TaskManager taskManager;
|
private readonly TaskManager taskManager;
|
||||||
private readonly CancellationToken cancellationToken;
|
private readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
protected MessageHandler(TListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) {
|
protected MessageHandler(TListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) {
|
||||||
this.Listener = listener;
|
this.listener = listener;
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.taskManager = taskManager;
|
this.taskManager = taskManager;
|
||||||
this.cancellationToken = cancellationToken;
|
this.cancellationToken = cancellationToken;
|
||||||
@ -29,12 +28,11 @@ public abstract class MessageHandler<TListener> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private async Task Handle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
private async Task Handle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
||||||
TReply reply = await message.Accept(Listener);
|
TReply reply = await message.Accept(listener);
|
||||||
|
|
||||||
if (reply is not NoReply) {
|
if (reply is not NoReply) {
|
||||||
await SendReply(sequenceId, MessageSerializer.Serialize(reply));
|
await SendReply(sequenceId, MessageSerializer.Serialize(reply));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract Task SendReply(uint sequenceId, byte[] serializedReply);
|
protected abstract Task SendReply(uint sequenceId, byte[] serializedReply);
|
||||||
}
|
}
|
||||||
|
@ -1,56 +1,34 @@
|
|||||||
using NetMQ;
|
using NetMQ.Sockets;
|
||||||
using NetMQ.Sockets;
|
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
|
using Serilog;
|
||||||
using Serilog.Events;
|
using Serilog.Events;
|
||||||
using ILogger = Serilog.ILogger;
|
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc;
|
namespace Phantom.Utils.Rpc;
|
||||||
|
|
||||||
public abstract class RpcClientRuntime<TClientListener, TServerListener, THelloMessage> : RpcRuntime<ClientSocket> where THelloMessage : IMessage<TServerListener, NoReply> {
|
public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
private static ClientSocket CreateSocket(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, THelloMessage helloMessage) {
|
private readonly RpcConnectionToServer<TServerListener> connection;
|
||||||
var socket = new ClientSocket();
|
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||||
var options = socket.Options;
|
private readonly TClientListener messageListener;
|
||||||
|
|
||||||
options.CurveServerCertificate = config.ServerCertificate;
|
|
||||||
options.CurveCertificate = new NetMQCertificate();
|
|
||||||
options.HelloMessage = messageDefinitions.ToServer.Write(helloMessage).ToArray();
|
|
||||||
|
|
||||||
return socket;
|
|
||||||
}
|
|
||||||
|
|
||||||
private readonly RpcConfiguration config;
|
|
||||||
private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions;
|
|
||||||
private readonly Func<RpcConnectionToServer<TServerListener>, TClientListener> messageListenerFactory;
|
|
||||||
|
|
||||||
private readonly SemaphoreSlim disconnectSemaphore;
|
private readonly SemaphoreSlim disconnectSemaphore;
|
||||||
private readonly CancellationToken receiveCancellationToken;
|
private readonly CancellationToken receiveCancellationToken;
|
||||||
|
|
||||||
protected RpcClientRuntime(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, Func<RpcConnectionToServer<TServerListener>, TClientListener> messageListenerFactory, THelloMessage helloMessage, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, CreateSocket(config, messageDefinitions, helloMessage)) {
|
protected RpcClientRuntime(RpcClientSocket<TClientListener, TServerListener, TReplyMessage> socket, TClientListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket) {
|
||||||
this.config = config;
|
this.connection = socket.Connection;
|
||||||
this.messageDefinitions = messageDefinitions;
|
this.messageDefinitions = socket.MessageDefinitions;
|
||||||
this.messageListenerFactory = messageListenerFactory;
|
this.messageListener = messageListener;
|
||||||
this.disconnectSemaphore = disconnectSemaphore;
|
this.disconnectSemaphore = disconnectSemaphore;
|
||||||
this.receiveCancellationToken = receiveCancellationToken;
|
this.receiveCancellationToken = receiveCancellationToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected sealed override void Connect(ClientSocket socket) {
|
protected sealed override void Run(ClientSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager) {
|
||||||
var logger = config.RuntimeLogger;
|
RunWithConnection(socket, connection, logger, taskManager);
|
||||||
var url = config.TcpUrl;
|
|
||||||
|
|
||||||
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
|
|
||||||
socket.Connect(url);
|
|
||||||
logger.Information("ZeroMQ client ready.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected sealed override void Run(ClientSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
|
protected virtual void RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection, ILogger logger, TaskManager taskManager) {
|
||||||
var connection = new RpcConnectionToServer<TServerListener>(socket, messageDefinitions.ToServer, replyTracker);
|
var handler = new Handler(connection, messageDefinitions, messageListener, logger, taskManager, receiveCancellationToken);
|
||||||
RunWithConnection(socket, connection, taskManager);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected virtual void RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection, TaskManager taskManager) {
|
|
||||||
var logger = config.RuntimeLogger;
|
|
||||||
var handler = new Handler(connection, messageDefinitions, messageListenerFactory(connection), logger, taskManager, receiveCancellationToken);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (!receiveCancellationToken.IsCancellationRequested) {
|
while (!receiveCancellationToken.IsCancellationRequested) {
|
||||||
@ -85,15 +63,15 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, THelloM
|
|||||||
|
|
||||||
private sealed class Handler : MessageHandler<TClientListener> {
|
private sealed class Handler : MessageHandler<TClientListener> {
|
||||||
private readonly RpcConnectionToServer<TServerListener> connection;
|
private readonly RpcConnectionToServer<TServerListener> connection;
|
||||||
private readonly IMessageDefinitions<TClientListener, TServerListener> messageDefinitions;
|
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||||
|
|
||||||
public Handler(RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener> messageDefinitions, TClientListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
|
public Handler(RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TClientListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.messageDefinitions = messageDefinitions;
|
this.messageDefinitions = messageDefinitions;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
||||||
return connection.Send(messageDefinitions.CreateReplyToClientMessage(sequenceId, serializedReply));
|
return connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@ public sealed class RpcConnectionToServer<TListener> {
|
|||||||
private readonly MessageRegistry<TListener> messageRegistry;
|
private readonly MessageRegistry<TListener> messageRegistry;
|
||||||
private readonly MessageReplyTracker replyTracker;
|
private readonly MessageReplyTracker replyTracker;
|
||||||
|
|
||||||
public RpcConnectionToServer(ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) {
|
internal RpcConnectionToServer(ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) {
|
||||||
this.socket = socket;
|
this.socket = socket;
|
||||||
this.messageRegistry = messageRegistry;
|
this.messageRegistry = messageRegistry;
|
||||||
this.replyTracker = replyTracker;
|
this.replyTracker = replyTracker;
|
||||||
|
@ -1,39 +1,28 @@
|
|||||||
using NetMQ;
|
using NetMQ;
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc;
|
namespace Phantom.Utils.Rpc;
|
||||||
|
|
||||||
static class RpcRuntime {
|
|
||||||
internal static void SetDefaultSocketOptions(ThreadSafeSocketOptions options) {
|
|
||||||
// TODO test behavior when either agent or server are offline for a very long time
|
|
||||||
options.DelayAttachOnConnect = true;
|
|
||||||
options.ReceiveHighWatermark = 10_000;
|
|
||||||
options.SendHighWatermark = 10_000;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
|
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
|
||||||
private readonly TSocket socket;
|
private readonly TSocket socket;
|
||||||
private readonly ILogger runtimeLogger;
|
private readonly ILogger runtimeLogger;
|
||||||
private readonly MessageReplyTracker replyTracker;
|
private readonly MessageReplyTracker replyTracker;
|
||||||
private readonly TaskManager taskManager;
|
private readonly TaskManager taskManager;
|
||||||
|
|
||||||
protected RpcRuntime(RpcConfiguration configuration, TSocket socket) {
|
protected RpcRuntime(RpcSocket<TSocket> socket) {
|
||||||
RpcRuntime.SetDefaultSocketOptions(socket.Options);
|
this.socket = socket.Socket;
|
||||||
this.socket = socket;
|
this.runtimeLogger = socket.Config.RuntimeLogger;
|
||||||
this.runtimeLogger = configuration.RuntimeLogger;
|
this.replyTracker = socket.ReplyTracker;
|
||||||
this.replyTracker = new MessageReplyTracker(runtimeLogger);
|
this.taskManager = new TaskManager(socket.Config.TaskManagerLogger);
|
||||||
this.taskManager = new TaskManager(configuration.TaskManagerLogger);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected async Task Launch() {
|
protected async Task Launch() {
|
||||||
Connect(socket);
|
|
||||||
|
|
||||||
void RunTask() {
|
void RunTask() {
|
||||||
try {
|
try {
|
||||||
Run(socket, replyTracker, taskManager);
|
Run(socket, runtimeLogger, replyTracker, taskManager);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
runtimeLogger.Error(e, "Caught exception in RPC thread.");
|
runtimeLogger.Error(e, "Caught exception in RPC thread.");
|
||||||
}
|
}
|
||||||
@ -42,21 +31,19 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
|
|||||||
try {
|
try {
|
||||||
await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
|
await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
// ignore
|
// Ignore.
|
||||||
} finally {
|
} finally {
|
||||||
await taskManager.Stop();
|
await taskManager.Stop();
|
||||||
await Disconnect(socket);
|
await Disconnect(socket, runtimeLogger);
|
||||||
|
|
||||||
socket.Dispose();
|
socket.Dispose();
|
||||||
NetMQConfig.Cleanup();
|
runtimeLogger.Information("ZeroMQ runtime stopped.");
|
||||||
runtimeLogger.Information("ZeroMQ client stopped.");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void Connect(TSocket socket);
|
protected abstract void Run(TSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager);
|
||||||
protected abstract void Run(TSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager);
|
|
||||||
|
|
||||||
protected virtual Task Disconnect(TSocket socket) {
|
protected virtual Task Disconnect(TSocket socket, ILogger logger) {
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
40
Utils/Phantom.Utils.Rpc/Sockets/RpcClientSocket.cs
Normal file
40
Utils/Phantom.Utils.Rpc/Sockets/RpcClientSocket.cs
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
using NetMQ;
|
||||||
|
using NetMQ.Sockets;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Sockets;
|
||||||
|
|
||||||
|
public static class RpcClientSocket {
|
||||||
|
public static RpcClientSocket<TClientListener, TServerListener, TReplyMessage> Connect<TClientListener, TServerListener, TReplyMessage, THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : IMessage<TServerListener, NoReply> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
|
return RpcClientSocket<TClientListener, TServerListener, TReplyMessage>.Connect(config, messageDefinitions, helloMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class RpcClientSocket<TClientListener, TServerListener, TReplyMessage> : RpcSocket<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
|
internal static RpcClientSocket<TClientListener, TServerListener, TReplyMessage> Connect<THelloMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, THelloMessage helloMessage) where THelloMessage : IMessage<TServerListener, NoReply> {
|
||||||
|
var socket = new ClientSocket();
|
||||||
|
var options = socket.Options;
|
||||||
|
|
||||||
|
options.CurveServerCertificate = config.ServerCertificate;
|
||||||
|
options.CurveCertificate = new NetMQCertificate();
|
||||||
|
options.HelloMessage = messageDefinitions.ToServer.Write(helloMessage).ToArray();
|
||||||
|
RpcSocket.SetDefaultSocketOptions(options);
|
||||||
|
|
||||||
|
var url = config.TcpUrl;
|
||||||
|
var logger = config.RuntimeLogger;
|
||||||
|
|
||||||
|
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
|
||||||
|
socket.Connect(url);
|
||||||
|
logger.Information("ZeroMQ client ready.");
|
||||||
|
|
||||||
|
return new RpcClientSocket<TClientListener, TServerListener, TReplyMessage>(socket, config, messageDefinitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RpcConnectionToServer<TServerListener> Connection { get; }
|
||||||
|
internal IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions { get; }
|
||||||
|
|
||||||
|
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions) : base(socket, config) {
|
||||||
|
MessageDefinitions = messageDefinitions;
|
||||||
|
Connection = new RpcConnectionToServer<TServerListener>(socket, messageDefinitions.ToServer, ReplyTracker);
|
||||||
|
}
|
||||||
|
}
|
25
Utils/Phantom.Utils.Rpc/Sockets/RpcServerSocket.cs
Normal file
25
Utils/Phantom.Utils.Rpc/Sockets/RpcServerSocket.cs
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
using NetMQ.Sockets;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Sockets;
|
||||||
|
|
||||||
|
public sealed class RpcServerSocket : RpcSocket<ServerSocket> {
|
||||||
|
public static RpcServerSocket Connect(RpcConfiguration config) {
|
||||||
|
var socket = new ServerSocket();
|
||||||
|
var options = socket.Options;
|
||||||
|
|
||||||
|
options.CurveServer = true;
|
||||||
|
options.CurveCertificate = config.ServerCertificate;
|
||||||
|
RpcSocket.SetDefaultSocketOptions(options);
|
||||||
|
|
||||||
|
var url = config.TcpUrl;
|
||||||
|
var logger = config.RuntimeLogger;
|
||||||
|
|
||||||
|
logger.Information("Starting ZeroMQ server on {Url}...", url);
|
||||||
|
socket.Bind(url);
|
||||||
|
logger.Information("ZeroMQ server initialized, listening for connections on port {Port}.", config.Port);
|
||||||
|
|
||||||
|
return new RpcServerSocket(socket, config);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RpcServerSocket(ServerSocket socket, RpcConfiguration config) : base(socket, config) {}
|
||||||
|
}
|
25
Utils/Phantom.Utils.Rpc/Sockets/RpcSocket.cs
Normal file
25
Utils/Phantom.Utils.Rpc/Sockets/RpcSocket.cs
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
using NetMQ;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Sockets;
|
||||||
|
|
||||||
|
static class RpcSocket {
|
||||||
|
internal static void SetDefaultSocketOptions(ThreadSafeSocketOptions options) {
|
||||||
|
// TODO test behavior when either agent or server are offline for a very long time
|
||||||
|
options.DelayAttachOnConnect = true;
|
||||||
|
options.ReceiveHighWatermark = 10_000;
|
||||||
|
options.SendHighWatermark = 10_000;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract class RpcSocket<TSocket> where TSocket : ThreadSafeSocket {
|
||||||
|
internal TSocket Socket { get; }
|
||||||
|
internal RpcConfiguration Config { get; }
|
||||||
|
internal MessageReplyTracker ReplyTracker { get; }
|
||||||
|
|
||||||
|
protected RpcSocket(TSocket socket, RpcConfiguration config) {
|
||||||
|
Socket = socket;
|
||||||
|
Config = config;
|
||||||
|
ReplyTracker = new MessageReplyTracker(config.RuntimeLogger);
|
||||||
|
}
|
||||||
|
}
|
@ -1,21 +1,23 @@
|
|||||||
using Microsoft.AspNetCore.Authentication.Cookies;
|
using Microsoft.AspNetCore.Authorization;
|
||||||
using Microsoft.AspNetCore.Authorization;
|
|
||||||
using Microsoft.AspNetCore.Components.Authorization;
|
using Microsoft.AspNetCore.Components.Authorization;
|
||||||
using Microsoft.AspNetCore.Components.Server;
|
using Microsoft.AspNetCore.Components.Server;
|
||||||
using Phantom.Common.Data.Web.Users;
|
using Phantom.Common.Data.Web.Users;
|
||||||
using Phantom.Web.Services.Authentication;
|
using Phantom.Web.Services.Authentication;
|
||||||
using Phantom.Web.Services.Authorization;
|
using Phantom.Web.Services.Authorization;
|
||||||
|
using Phantom.Web.Services.Rpc;
|
||||||
|
|
||||||
namespace Phantom.Web.Services;
|
namespace Phantom.Web.Services;
|
||||||
|
|
||||||
public static class PhantomWebServices {
|
public static class PhantomWebServices {
|
||||||
public static void AddPhantomServices(this IServiceCollection services, CancellationToken cancellationToken) {
|
public static void AddPhantomServices(this IServiceCollection services, CancellationToken cancellationToken) {
|
||||||
services.AddAuthentication(CookieAuthenticationDefaults.AuthenticationScheme).AddCookie(ConfigureIdentityCookie);
|
services.AddSingleton<MessageListener>();
|
||||||
services.AddAuthorization(ConfigureAuthorization);
|
services.AddSingleton<ControllerCommunication>();
|
||||||
|
services.AddSingleton<PermissionManager>();
|
||||||
|
|
||||||
services.AddSingleton(PhantomLoginStore.Create(cancellationToken));
|
services.AddSingleton(PhantomLoginStore.Create(cancellationToken));
|
||||||
services.AddScoped<PhantomLoginManager>();
|
services.AddScoped<PhantomLoginManager>();
|
||||||
|
|
||||||
|
services.AddAuthorization(ConfigureAuthorization);
|
||||||
services.AddScoped<IAuthorizationHandler, PermissionBasedPolicyHandler>();
|
services.AddScoped<IAuthorizationHandler, PermissionBasedPolicyHandler>();
|
||||||
services.AddScoped<AuthenticationStateProvider, ServerAuthenticationStateProvider>();
|
services.AddScoped<AuthenticationStateProvider, ServerAuthenticationStateProvider>();
|
||||||
}
|
}
|
||||||
@ -26,19 +28,6 @@ public static class PhantomWebServices {
|
|||||||
application.UseWhen(PhantomIdentityMiddleware.AcceptsPath, static app => app.UseMiddleware<PhantomIdentityMiddleware>());
|
application.UseWhen(PhantomIdentityMiddleware.AcceptsPath, static app => app.UseMiddleware<PhantomIdentityMiddleware>());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void ConfigureIdentityCookie(CookieAuthenticationOptions o) {
|
|
||||||
o.Cookie.Name = "Phantom.Identity";
|
|
||||||
o.Cookie.HttpOnly = true;
|
|
||||||
o.Cookie.SameSite = SameSiteMode.Lax;
|
|
||||||
|
|
||||||
o.ExpireTimeSpan = TimeSpan.FromDays(30);
|
|
||||||
o.SlidingExpiration = true;
|
|
||||||
|
|
||||||
o.LoginPath = PhantomIdentityMiddleware.LoginPath;
|
|
||||||
o.LogoutPath = PhantomIdentityMiddleware.LogoutPath;
|
|
||||||
o.AccessDeniedPath = PhantomIdentityMiddleware.LoginPath;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void ConfigureAuthorization(AuthorizationOptions o) {
|
private static void ConfigureAuthorization(AuthorizationOptions o) {
|
||||||
foreach (var permission in Permission.All) {
|
foreach (var permission in Permission.All) {
|
||||||
o.AddPolicy(permission.Id, policy => policy.Requirements.Add(new PermissionBasedPolicyRequirement(permission)));
|
o.AddPolicy(permission.Id, policy => policy.Requirements.Add(new PermissionBasedPolicyRequirement(permission)));
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
using Phantom.Common.Messages.Web;
|
using Phantom.Common.Messages.Web;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc;
|
||||||
|
|
||||||
namespace Phantom.Web.Services;
|
namespace Phantom.Web.Services.Rpc;
|
||||||
|
|
||||||
public sealed class ControllerCommunication {
|
public sealed class ControllerCommunication {
|
||||||
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
|
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
|
||||||
|
19
Web/Phantom.Web.Services/Rpc/MessageListener.cs
Normal file
19
Web/Phantom.Web.Services/Rpc/MessageListener.cs
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
using Phantom.Common.Messages.Web;
|
||||||
|
using Phantom.Common.Messages.Web.BiDirectional;
|
||||||
|
using Phantom.Utils.Rpc;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
namespace Phantom.Web.Services.Rpc;
|
||||||
|
|
||||||
|
public sealed class MessageListener : IMessageToWebListener {
|
||||||
|
private readonly RpcConnectionToServer<IMessageToControllerListener> connection;
|
||||||
|
|
||||||
|
public MessageListener(RpcConnectionToServer<IMessageToControllerListener> connection) {
|
||||||
|
this.connection = connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<NoReply> HandleReply(ReplyMessage message) {
|
||||||
|
connection.Receive(message);
|
||||||
|
return Task.FromResult(NoReply.Instance);
|
||||||
|
}
|
||||||
|
}
|
@ -1,22 +1,14 @@
|
|||||||
using NetMQ;
|
using Phantom.Common.Messages.Web;
|
||||||
using NetMQ.Sockets;
|
|
||||||
using Phantom.Common.Data;
|
|
||||||
using Phantom.Common.Data.Agent;
|
|
||||||
using Phantom.Common.Messages.Web;
|
|
||||||
using Phantom.Common.Messages.Web.BiDirectional;
|
using Phantom.Common.Messages.Web.BiDirectional;
|
||||||
using Phantom.Common.Messages.Web.ToController;
|
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc;
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using Phantom.Utils.Tasks;
|
|
||||||
using Serilog.Events;
|
|
||||||
using ILogger = Serilog.ILogger;
|
|
||||||
|
|
||||||
namespace Phantom.Web.Services.Rpc;
|
namespace Phantom.Web.Services.Rpc;
|
||||||
|
|
||||||
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToWebListener, IMessageToControllerListener> {
|
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> {
|
||||||
public static Task Launch(RpcConfiguration config, AuthToken authToken, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToWebListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
|
public static Task Launch(RpcClientSocket<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToWebListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
|
||||||
return new RpcClientRuntime(config, listenerFactory, new RegisterWebMessage(authToken), disconnectSemaphore, receiveCancellationToken).Launch();
|
return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch();
|
||||||
}
|
}
|
||||||
|
|
||||||
private RpcClientRuntime(RpcConfiguration config, Func<RpcConnectionToServer<IMessageToControllerListener>, IMessageToWebListener> messageListenerFactory, RegisterWebMessage registerWebMessage, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, WebMessageRegistries.Definitions, messageListenerFactory, registerWebMessage, disconnectSemaphore, receiveCancellationToken) {}
|
private RpcClientRuntime(RpcClientSocket<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToWebListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,6 @@
|
|||||||
|
|
||||||
namespace Phantom.Web;
|
namespace Phantom.Web;
|
||||||
|
|
||||||
public sealed record Configuration(ILogger Logger, string Host, ushort Port, string BasePath, string DataProtectionKeyFolderPath, CancellationToken CancellationToken) {
|
sealed record Configuration(ILogger Logger, string Host, ushort Port, string BasePath, string DataProtectionKeyFolderPath, CancellationToken CancellationToken) {
|
||||||
public string HttpUrl => "http://" + Host + ":" + Port;
|
public string HttpUrl => "http://" + Host + ":" + Port;
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,12 @@
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
|
using NetMQ;
|
||||||
using Phantom.Common.Logging;
|
using Phantom.Common.Logging;
|
||||||
|
using Phantom.Common.Messages.Web;
|
||||||
|
using Phantom.Common.Messages.Web.ToController;
|
||||||
using Phantom.Utils.Cryptography;
|
using Phantom.Utils.Cryptography;
|
||||||
using Phantom.Utils.IO;
|
using Phantom.Utils.IO;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc;
|
||||||
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using Phantom.Utils.Runtime;
|
using Phantom.Utils.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
using Phantom.Web;
|
using Phantom.Web;
|
||||||
@ -46,22 +50,27 @@ try {
|
|||||||
|
|
||||||
PhantomLogger.Root.InformationHeading("Launching Phantom Panel web...");
|
PhantomLogger.Root.InformationHeading("Launching Phantom Panel web...");
|
||||||
|
|
||||||
var taskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Web"));
|
|
||||||
|
|
||||||
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
|
|
||||||
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
|
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
|
||||||
var rpcTask = RpcClientRuntime.Launch(rpcConfiguration, webToken, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken);
|
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken));
|
||||||
try {
|
|
||||||
var configuration = new Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
|
var configuration = new Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
|
||||||
|
var administratorToken = TokenGenerator.Create(60);
|
||||||
|
|
||||||
var administratorToken = TokenGenerator.Create(60);
|
var taskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Web"));
|
||||||
|
var serviceConfiguration = new ServiceConfiguration(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken), shutdownCancellationToken);
|
||||||
|
var webApplication = WebLauncher.CreateApplication(configuration, taskManager, serviceConfiguration, rpcSocket.Connection);
|
||||||
|
|
||||||
|
MessageListener messageListener;
|
||||||
|
await using (var scope = webApplication.Services.CreateAsyncScope()) {
|
||||||
|
messageListener = scope.ServiceProvider.GetRequiredService<MessageListener>();
|
||||||
|
}
|
||||||
|
|
||||||
|
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
|
||||||
|
var rpcTask = RpcClientRuntime.Launch(rpcSocket, messageListener, rpcDisconnectSemaphore, shutdownCancellationToken);
|
||||||
|
try {
|
||||||
PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken);
|
PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken);
|
||||||
PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", configuration.HttpUrl, configuration.BasePath + "setup");
|
PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", configuration.HttpUrl, configuration.BasePath + "setup");
|
||||||
|
await WebLauncher.Launch(configuration, webApplication);
|
||||||
var serviceConfiguration = new ServiceConfiguration(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken), shutdownCancellationToken);
|
|
||||||
var webApplication = Launcher.CreateApplication(configuration, serviceConfiguration, taskManager);
|
|
||||||
|
|
||||||
await Launcher.Launch(configuration, webApplication);
|
|
||||||
} finally {
|
} finally {
|
||||||
shutdownCancellationTokenSource.Cancel();
|
shutdownCancellationTokenSource.Cancel();
|
||||||
await taskManager.Stop();
|
await taskManager.Stop();
|
||||||
@ -69,6 +78,8 @@ try {
|
|||||||
rpcDisconnectSemaphore.Release();
|
rpcDisconnectSemaphore.Release();
|
||||||
await rpcTask;
|
await rpcTask;
|
||||||
rpcDisconnectSemaphore.Dispose();
|
rpcDisconnectSemaphore.Dispose();
|
||||||
|
|
||||||
|
NetMQConfig.Cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
using NetMQ;
|
using NetMQ;
|
||||||
using Phantom.Common.Data;
|
using Phantom.Common.Data;
|
||||||
using Phantom.Common.Data.Agent;
|
|
||||||
using Phantom.Common.Logging;
|
using Phantom.Common.Logging;
|
||||||
using Phantom.Utils.Cryptography;
|
using Phantom.Utils.Cryptography;
|
||||||
using Phantom.Utils.IO;
|
using Phantom.Utils.IO;
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
using Microsoft.AspNetCore.DataProtection;
|
using Microsoft.AspNetCore.DataProtection;
|
||||||
|
using Phantom.Common.Messages.Web;
|
||||||
|
using Phantom.Utils.Rpc;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
using Phantom.Web.Base;
|
using Phantom.Web.Base;
|
||||||
using Phantom.Web.Services;
|
using Phantom.Web.Services;
|
||||||
@ -6,9 +8,9 @@ using Serilog;
|
|||||||
|
|
||||||
namespace Phantom.Web;
|
namespace Phantom.Web;
|
||||||
|
|
||||||
static class Launcher {
|
static class WebLauncher {
|
||||||
public static WebApplication CreateApplication(Configuration config, ServiceConfiguration serviceConfiguration, TaskManager taskManager) {
|
public static WebApplication CreateApplication(Configuration config, TaskManager taskManager, ServiceConfiguration serviceConfiguration, RpcConnectionToServer<IMessageToControllerListener> controllerConnection) {
|
||||||
var assembly = typeof(Launcher).Assembly;
|
var assembly = typeof(WebLauncher).Assembly;
|
||||||
var builder = WebApplication.CreateBuilder(new WebApplicationOptions {
|
var builder = WebApplication.CreateBuilder(new WebApplicationOptions {
|
||||||
ApplicationName = assembly.GetName().Name,
|
ApplicationName = assembly.GetName().Name,
|
||||||
ContentRootPath = Path.GetDirectoryName(assembly.Location)
|
ContentRootPath = Path.GetDirectoryName(assembly.Location)
|
||||||
@ -23,8 +25,9 @@ static class Launcher {
|
|||||||
builder.WebHost.UseStaticWebAssets();
|
builder.WebHost.UseStaticWebAssets();
|
||||||
}
|
}
|
||||||
|
|
||||||
builder.Services.AddSingleton(serviceConfiguration);
|
|
||||||
builder.Services.AddSingleton(taskManager);
|
builder.Services.AddSingleton(taskManager);
|
||||||
|
builder.Services.AddSingleton(serviceConfiguration);
|
||||||
|
builder.Services.AddSingleton(controllerConnection);
|
||||||
builder.Services.AddPhantomServices(config.CancellationToken);
|
builder.Services.AddPhantomServices(config.CancellationToken);
|
||||||
|
|
||||||
builder.Services.AddSingleton<IHostLifetime>(new NullLifetime());
|
builder.Services.AddSingleton<IHostLifetime>(new NullLifetime());
|
Loading…
Reference in New Issue
Block a user