1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-09-17 12:24:49 +02:00

2 Commits

Author SHA1 Message Date
47e563f7ce Replace NetMQ with custom TCP logic 2025-09-10 20:15:25 +02:00
29c403f2d4 Replace NetMQ with custom TCP logic 2025-09-10 19:48:32 +02:00
36 changed files with 202 additions and 251 deletions

View File

@@ -1,43 +0,0 @@
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Agent.Rpc;
sealed class KeepAliveLoop {
private static readonly ILogger Logger = PhantomLogger.Create<KeepAliveLoop>();
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromSeconds(10);
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly CancellationTokenSource cancellationTokenSource = new ();
public KeepAliveLoop(RpcConnectionToServer<IMessageToController> connection) {
this.connection = connection;
Task.Run(Run);
}
private async Task Run() {
var cancellationToken = cancellationTokenSource.Token;
try {
await connection.IsReady.WaitAsync(cancellationToken);
Logger.Information("Started keep-alive loop.");
while (true) {
await Task.Delay(KeepAliveInterval, cancellationToken);
await connection.Send(new AgentIsAliveMessage()).WaitAsync(cancellationToken);
}
} catch (OperationCanceledException) {
// Ignore.
} finally {
cancellationTokenSource.Dispose();
Logger.Information("Stopped keep-alive loop.");
}
}
public void Cancel() {
cancellationTokenSource.Cancel();
}
}

View File

@@ -1,12 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" />
</ItemGroup>
</Project>

View File

@@ -1,8 +1,8 @@
using Akka.Actor; using Akka.Actor;
using Phantom.Agent.Minecraft.Java; using Phantom.Agent.Minecraft.Java;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups; using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Instances; using Phantom.Agent.Services.Instances;
using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Utils.Actor; using Phantom.Utils.Actor;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;

View File

@@ -4,8 +4,8 @@ using Phantom.Agent.Minecraft.Launcher;
using Phantom.Agent.Minecraft.Launcher.Types; using Phantom.Agent.Minecraft.Launcher.Types;
using Phantom.Agent.Minecraft.Properties; using Phantom.Agent.Minecraft.Properties;
using Phantom.Agent.Minecraft.Server; using Phantom.Agent.Minecraft.Server;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups; using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;

View File

@@ -1,6 +1,6 @@
using Phantom.Agent.Minecraft.Launcher; using Phantom.Agent.Minecraft.Launcher;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups; using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Rpc;
namespace Phantom.Agent.Services.Instances; namespace Phantom.Agent.Services.Instances;

View File

@@ -1,4 +1,4 @@
using Phantom.Agent.Rpc; using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;

View File

@@ -1,6 +1,6 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Threading.Channels; using System.Threading.Channels;
using Phantom.Agent.Rpc; using Phantom.Agent.Services.Rpc;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;

View File

@@ -1,6 +1,6 @@
using Phantom.Agent.Minecraft.Instance; using Phantom.Agent.Minecraft.Instance;
using Phantom.Agent.Minecraft.Server; using Phantom.Agent.Minecraft.Server;
using Phantom.Agent.Rpc; using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;

View File

@@ -8,7 +8,6 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" />
<ProjectReference Include="..\Phantom.Agent.Minecraft\Phantom.Agent.Minecraft.csproj" /> <ProjectReference Include="..\Phantom.Agent.Minecraft\Phantom.Agent.Minecraft.csproj" />
<ProjectReference Include="..\Phantom.Agent.Rpc\Phantom.Agent.Rpc.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@@ -1,7 +1,7 @@
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Services.Rpc;
public sealed class ControllerConnection(RpcSendChannel<IMessageToController> sendChannel) { public sealed class ControllerConnection(RpcSendChannel<IMessageToController> sendChannel) {
public ValueTask Send<TMessage>(TMessage message) where TMessage : IMessageToController { public ValueTask Send<TMessage>(TMessage message) where TMessage : IMessageToController {

View File

@@ -1,6 +1,6 @@
using Phantom.Common.Data; using Phantom.Common.Data;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Services.Rpc;
sealed class RpcClientAgentHandshake(AuthToken authToken) { sealed class RpcClientAgentHandshake(AuthToken authToken) {

View File

@@ -1,6 +1,5 @@
using System.Reflection; using System.Reflection;
using Phantom.Agent; using Phantom.Agent;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services; using Phantom.Agent.Services;
using Phantom.Agent.Services.Rpc; using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
@@ -50,24 +49,31 @@ try {
var (certificateThumbprint, authToken) = agentKey.Value; var (certificateThumbprint, authToken) = agentKey.Value;
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts); var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent..."); var rpcClientConnectionParameters = new RpcClientConnectionParameters(
Host: controllerHost,
Port: controllerPort,
DistinguishedName: "phantom-controller",
CertificateThumbprint: certificateThumbprint,
SendQueueCapacity: 500,
PingInterval: TimeSpan.FromSeconds(10)
);
using var rpcClient = await RpcClient<IMessageToController, IMessageToAgent>.Connect("Controller", controllerHost, controllerPort, "phantom-controller", certificateThumbprint, null, AgentMessageRegistries.Definitions, shutdownCancellationToken); using var rpcClient = await RpcClient<IMessageToController, IMessageToAgent>.Connect("Controller", rpcClientConnectionParameters, null, AgentMessageRegistries.Definitions, shutdownCancellationToken);
if (rpcClient == null) { if (rpcClient == null) {
return 1; return 1;
} }
Task? rpcClientListener = null; Task? rpcClientListener = null;
try { try {
// var rpcConfiguration = new RpcConfiguration("Agent", controllerHost, controllerPort, controllerCertificate); PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
// var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
//
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcClient.SendChannel)); var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcClient.SendChannel));
await agentServices.Initialize(); await agentServices.Initialize();
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(rpcClient.SendChannel, agentServices, shutdownCancellationTokenSource); var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(rpcClient.SendChannel, agentServices, shutdownCancellationTokenSource);
var rpcMessageHandlerActor = agentServices.ActorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler"); var rpcMessageHandlerActor = agentServices.ActorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
PhantomLogger.Root.Information("Phantom Panel agent is ready.");
rpcClientListener = rpcClient.Listen(rpcMessageHandlerActor); rpcClientListener = rpcClient.Listen(rpcMessageHandlerActor);
await shutdownCancellationToken.WaitHandle.WaitOneAsync(); await shutdownCancellationToken.WaitHandle.WaitOneAsync();

View File

@@ -3,7 +3,6 @@ using Phantom.Common.Data.Agent;
using Phantom.Common.Messages.Agent.Handshake; using Phantom.Common.Messages.Agent.Handshake;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
using Serilog; using Serilog;
namespace Phantom.Controller.Services.Rpc; namespace Phantom.Controller.Services.Rpc;

View File

@@ -18,8 +18,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Agent", "Agent\Phan
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Agent.Minecraft", "Agent\Phantom.Agent.Minecraft\Phantom.Agent.Minecraft.csproj", "{9FE000D0-91AC-4CB4-8956-91CCC0270015}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Agent.Minecraft", "Agent\Phantom.Agent.Minecraft\Phantom.Agent.Minecraft.csproj", "{9FE000D0-91AC-4CB4-8956-91CCC0270015}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Agent.Rpc", "Agent\Phantom.Agent.Rpc\Phantom.Agent.Rpc.csproj", "{665C7B87-0165-48BC-B6A6-17A3812A70C9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Agent.Services", "Agent\Phantom.Agent.Services\Phantom.Agent.Services.csproj", "{AEE8B77E-AB07-423F-9981-8CD829ACB834}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Agent.Services", "Agent\Phantom.Agent.Services\Phantom.Agent.Services.csproj", "{AEE8B77E-AB07-423F-9981-8CD829ACB834}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Data", "Common\Phantom.Common.Data\Phantom.Common.Data.csproj", "{6C3DB1E5-F695-4D70-8F3A-78C2957274BE}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Data", "Common\Phantom.Common.Data\Phantom.Common.Data.csproj", "{6C3DB1E5-F695-4D70-8F3A-78C2957274BE}"
@@ -76,10 +74,6 @@ Global
{9FE000D0-91AC-4CB4-8956-91CCC0270015}.Debug|Any CPU.Build.0 = Debug|Any CPU {9FE000D0-91AC-4CB4-8956-91CCC0270015}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9FE000D0-91AC-4CB4-8956-91CCC0270015}.Release|Any CPU.ActiveCfg = Release|Any CPU {9FE000D0-91AC-4CB4-8956-91CCC0270015}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9FE000D0-91AC-4CB4-8956-91CCC0270015}.Release|Any CPU.Build.0 = Release|Any CPU {9FE000D0-91AC-4CB4-8956-91CCC0270015}.Release|Any CPU.Build.0 = Release|Any CPU
{665C7B87-0165-48BC-B6A6-17A3812A70C9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{665C7B87-0165-48BC-B6A6-17A3812A70C9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{665C7B87-0165-48BC-B6A6-17A3812A70C9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{665C7B87-0165-48BC-B6A6-17A3812A70C9}.Release|Any CPU.Build.0 = Release|Any CPU
{AEE8B77E-AB07-423F-9981-8CD829ACB834}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {AEE8B77E-AB07-423F-9981-8CD829ACB834}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AEE8B77E-AB07-423F-9981-8CD829ACB834}.Debug|Any CPU.Build.0 = Debug|Any CPU {AEE8B77E-AB07-423F-9981-8CD829ACB834}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AEE8B77E-AB07-423F-9981-8CD829ACB834}.Release|Any CPU.ActiveCfg = Release|Any CPU {AEE8B77E-AB07-423F-9981-8CD829ACB834}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -164,7 +158,6 @@ Global
GlobalSection(NestedProjects) = preSolution GlobalSection(NestedProjects) = preSolution
{418BE1BF-9F63-4B46-B4E4-DF64C3B3DDA7} = {F5878792-64C8-4ECF-A075-66341FF97127} {418BE1BF-9F63-4B46-B4E4-DF64C3B3DDA7} = {F5878792-64C8-4ECF-A075-66341FF97127}
{9FE000D0-91AC-4CB4-8956-91CCC0270015} = {F5878792-64C8-4ECF-A075-66341FF97127} {9FE000D0-91AC-4CB4-8956-91CCC0270015} = {F5878792-64C8-4ECF-A075-66341FF97127}
{665C7B87-0165-48BC-B6A6-17A3812A70C9} = {F5878792-64C8-4ECF-A075-66341FF97127}
{AEE8B77E-AB07-423F-9981-8CD829ACB834} = {F5878792-64C8-4ECF-A075-66341FF97127} {AEE8B77E-AB07-423F-9981-8CD829ACB834} = {F5878792-64C8-4ECF-A075-66341FF97127}
{6C3DB1E5-F695-4D70-8F3A-78C2957274BE} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18} {6C3DB1E5-F695-4D70-8F3A-78C2957274BE} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}
{95B55357-F8F0-48C2-A1C2-5EA997651783} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18} {95B55357-F8F0-48C2-A1C2-5EA997651783} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}

View File

@@ -3,10 +3,12 @@
namespace Phantom.Utils.Rpc.Frame; namespace Phantom.Utils.Rpc.Frame;
interface IFrame { interface IFrame {
private const byte TypePingId = 0;
private const byte TypeMessageId = 1; private const byte TypeMessageId = 1;
private const byte TypeReplyId = 2; private const byte TypeReplyId = 2;
private const byte TypeErrorId = 3; private const byte TypeErrorId = 3;
static readonly ReadOnlyMemory<byte> TypePing = new ([TypePingId]);
static readonly ReadOnlyMemory<byte> TypeMessage = new ([TypeMessageId]); static readonly ReadOnlyMemory<byte> TypeMessage = new ([TypeMessageId]);
static readonly ReadOnlyMemory<byte> TypeReply = new ([TypeReplyId]); static readonly ReadOnlyMemory<byte> TypeReply = new ([TypeReplyId]);
static readonly ReadOnlyMemory<byte> TypeError = new ([TypeErrorId]); static readonly ReadOnlyMemory<byte> TypeError = new ([TypeErrorId]);
@@ -18,6 +20,9 @@ interface IFrame {
await stream.ReadExactlyAsync(oneByteBuffer, cancellationToken); await stream.ReadExactlyAsync(oneByteBuffer, cancellationToken);
switch (oneByteBuffer[0]) { switch (oneByteBuffer[0]) {
case TypePingId:
break;
case TypeMessageId: case TypeMessageId:
var messageFrame = await MessageFrame.Read(stream, cancellationToken); var messageFrame = await MessageFrame.Read(stream, cancellationToken);
await reader.OnMessageFrame(messageFrame, stream, cancellationToken); await reader.OnMessageFrame(messageFrame, stream, cancellationToken);

View File

@@ -1,5 +1,4 @@
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
namespace Phantom.Utils.Rpc.Frame.Types; namespace Phantom.Utils.Rpc.Frame.Types;

View File

@@ -1,5 +1,4 @@
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
namespace Phantom.Utils.Rpc.Frame.Types; namespace Phantom.Utils.Rpc.Frame.Types;

View File

@@ -0,0 +1,11 @@
namespace Phantom.Utils.Rpc.Frame.Types;
sealed record PingFrame : IFrame {
public static PingFrame Instance { get; } = new PingFrame();
public ReadOnlyMemory<byte> Type => IFrame.TypePing;
public Task Write(Stream stream, CancellationToken cancellationToken) {
return Task.CompletedTask;
}
}

View File

@@ -1,5 +1,4 @@
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
namespace Phantom.Utils.Rpc.Frame.Types; namespace Phantom.Utils.Rpc.Frame.Types;

View File

@@ -1,7 +1,6 @@
using Phantom.Utils.Actor; using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Frame.Types; using Phantom.Utils.Rpc.Frame.Types;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
using Serilog; using Serilog;
namespace Phantom.Utils.Rpc.Message; namespace Phantom.Utils.Rpc.Message;

View File

@@ -1,7 +1,6 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;

View File

@@ -3,16 +3,15 @@ using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Frame; using Phantom.Utils.Rpc.Frame;
using Phantom.Utils.Rpc.Frame.Types; using Phantom.Utils.Rpc.Frame.Types;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime.Tls;
using Serilog; using Serilog;
namespace Phantom.Utils.Rpc.Runtime; namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> : IDisposable { public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> : IDisposable {
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, string host, ushort port, string distinguishedName, RpcCertificateThumbprint certificateThumbprint, RpcClientHandshake handshake, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) { public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, RpcClientConnectionParameters connectionParameters, RpcClientHandshake handshake, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) {
RpcClientConnector connector = new RpcClientConnector(loggerName, host, port, distinguishedName, certificateThumbprint, handshake); RpcClientConnector connector = new RpcClientConnector(loggerName, connectionParameters, handshake);
RpcClientConnector.Connection? connection = await connector.EstablishNewConnection(cancellationToken); RpcClientConnector.Connection? connection = await connector.EstablishNewConnection(cancellationToken);
return connection == null ? null : new RpcClient<TClientToServerMessage, TServerToClientMessage>(loggerName, messageDefinitions, connector, connection); return connection == null ? null : new RpcClient<TClientToServerMessage, TServerToClientMessage>(loggerName, connectionParameters, messageDefinitions, connector, connection);
} }
private readonly ILogger logger; private readonly ILogger logger;
@@ -21,12 +20,12 @@ public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> :
public RpcSendChannel<TClientToServerMessage> SendChannel { get; } public RpcSendChannel<TClientToServerMessage> SendChannel { get; }
private RpcClient(string loggerName, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, RpcClientConnector connector, RpcClientConnector.Connection connection) { private RpcClient(string loggerName, RpcClientConnectionParameters connectionParameters, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, RpcClientConnector connector, RpcClientConnector.Connection connection) {
this.logger = PhantomLogger.Create<RpcClient<TClientToServerMessage, TServerToClientMessage>>(loggerName); this.logger = PhantomLogger.Create<RpcClient<TClientToServerMessage, TServerToClientMessage>>(loggerName);
this.serverToClientMessageRegistry = messageDefinitions.ToClient; this.serverToClientMessageRegistry = messageDefinitions.ToClient;
this.connection = new RpcClientConnection(loggerName, connector, connection); this.connection = new RpcClientConnection(loggerName, connector, connection);
this.SendChannel = new RpcSendChannel<TClientToServerMessage>(loggerName, this.connection, messageDefinitions.ToServer, sendQueueCapacity: 500); this.SendChannel = new RpcSendChannel<TClientToServerMessage>(loggerName, connectionParameters, this.connection, messageDefinitions.ToServer);
} }
public async Task Listen(ActorRef<TServerToClientMessage> actor) { public async Task Listen(ActorRef<TServerToClientMessage> actor) {

View File

@@ -0,0 +1,12 @@
using Phantom.Utils.Rpc.Runtime.Tls;
namespace Phantom.Utils.Rpc.Runtime;
public readonly record struct RpcClientConnectionParameters(
string Host,
ushort Port,
string DistinguishedName,
RpcCertificateThumbprint CertificateThumbprint,
ushort SendQueueCapacity,
TimeSpan PingInterval
);

View File

@@ -15,19 +15,15 @@ internal sealed class RpcClientConnector {
private static readonly TimeSpan DisconnectTimeout = TimeSpan.FromSeconds(10); private static readonly TimeSpan DisconnectTimeout = TimeSpan.FromSeconds(10);
private readonly ILogger logger; private readonly ILogger logger;
private readonly string host; private readonly RpcClientConnectionParameters parameters;
private readonly ushort port;
private readonly RpcCertificateThumbprint certificateThumbprint;
private readonly RpcClientHandshake handshake; private readonly RpcClientHandshake handshake;
private readonly SslClientAuthenticationOptions sslOptions; private readonly SslClientAuthenticationOptions sslOptions;
private bool loggedCertificateValidationError = false; private bool loggedCertificateValidationError = false;
public RpcClientConnector(string loggerName, string host, ushort port, string distinguishedName, RpcCertificateThumbprint certificateThumbprint, RpcClientHandshake handshake) { public RpcClientConnector(string loggerName, RpcClientConnectionParameters parameters, RpcClientHandshake handshake) {
this.logger = PhantomLogger.Create<RpcClientConnector>(loggerName); this.logger = PhantomLogger.Create<RpcClientConnector>(loggerName);
this.host = host; this.parameters = parameters;
this.port = port;
this.certificateThumbprint = certificateThumbprint;
this.handshake = handshake; this.handshake = handshake;
this.sslOptions = new SslClientAuthenticationOptions { this.sslOptions = new SslClientAuthenticationOptions {
@@ -37,7 +33,7 @@ internal sealed class RpcClientConnector {
EnabledSslProtocols = TlsSupport.SupportedProtocols, EnabledSslProtocols = TlsSupport.SupportedProtocols,
EncryptionPolicy = EncryptionPolicy.RequireEncryption, EncryptionPolicy = EncryptionPolicy.RequireEncryption,
RemoteCertificateValidationCallback = ValidateServerCertificate, RemoteCertificateValidationCallback = ValidateServerCertificate,
TargetHost = distinguishedName, TargetHost = parameters.DistinguishedName,
}; };
} }
@@ -65,11 +61,11 @@ internal sealed class RpcClientConnector {
} }
internal async Task<Connection?> EstablishNewConnection(CancellationToken cancellationToken) { internal async Task<Connection?> EstablishNewConnection(CancellationToken cancellationToken) {
logger.Information("Connecting to {Host}:{Port}...", host, port); logger.Information("Connecting to {Host}:{Port}...", parameters.Host, parameters.Port);
Socket clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp); Socket clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
try { try {
await clientSocket.ConnectAsync(host, port, cancellationToken); await clientSocket.ConnectAsync(parameters.Host, parameters.Port, cancellationToken);
} catch (Exception e) { } catch (Exception e) {
logger.Error(e, "Could not connect."); logger.Error(e, "Could not connect.");
throw; throw;
@@ -129,7 +125,7 @@ internal sealed class RpcClientConnector {
else if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNameMismatch)) { else if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNameMismatch)) {
logger.Error("Could not establish a secure connection, server certificate has the wrong name: {Name}", certificate.Subject); logger.Error("Could not establish a secure connection, server certificate has the wrong name: {Name}", certificate.Subject);
} }
else if (!certificateThumbprint.Check(certificate)) { else if (!parameters.CertificateThumbprint.Check(certificate)) {
logger.Error("Could not establish a secure connection, server certificate does not match."); logger.Error("Could not establish a secure connection, server certificate does not match.");
} }
else if (TlsSupport.CheckAlgorithm((X509Certificate2) certificate) is {} error) { else if (TlsSupport.CheckAlgorithm((X509Certificate2) certificate) is {} error) {

View File

@@ -1,5 +1,20 @@
namespace Phantom.Utils.Rpc.Runtime; namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcErrorException(string message, RpcError error) : Exception(message) { public sealed class RpcErrorException : Exception {
public RpcError Error => error; internal static RpcErrorException From(RpcError error) {
return error switch {
RpcError.InvalidData => new RpcErrorException("Invalid data", error),
RpcError.UnknownMessageRegistryCode => new RpcErrorException("Unknown message registry code", error),
RpcError.MessageDeserializationError => new RpcErrorException("Message deserialization error", error),
RpcError.MessageHandlingError => new RpcErrorException("Message handling error", error),
RpcError.MessageTooLarge => new RpcErrorException("Message is too large", error),
_ => new RpcErrorException("Unknown error", error),
};
}
public RpcError Error { get; }
internal RpcErrorException(string message, RpcError error) : base(message) {
this.Error = error;
}
} }

View File

@@ -1,5 +0,0 @@
namespace Phantom.Utils.Rpc.Runtime;
sealed class RpcReceiveChannel {
}

View File

@@ -1,30 +1,36 @@
using System.Threading.Channels; using System.Diagnostics.CodeAnalysis;
using System.Threading.Channels;
using Phantom.Utils.Actor; using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Frame; using Phantom.Utils.Rpc.Frame;
using Phantom.Utils.Rpc.Frame.Types; using Phantom.Utils.Rpc.Frame.Types;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime.Utils; using Serilog;
namespace Phantom.Utils.Rpc.Runtime; namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcSendChannel<TMessageBase> : IDisposable { public sealed class RpcSendChannel<TMessageBase> : IDisposable {
private readonly ILogger logger;
private readonly IRpcConnectionProvider connectionProvider; private readonly IRpcConnectionProvider connectionProvider;
private readonly MessageRegistry<TMessageBase> messageRegistry; private readonly MessageRegistry<TMessageBase> messageRegistry;
private readonly MessageReplyTracker messageReplyTracker; private readonly MessageReplyTracker messageReplyTracker;
private readonly Channel<IFrame> sendQueue; private readonly Channel<IFrame> sendQueue;
private readonly Task sendQueueTask; private readonly Task sendQueueTask;
private readonly Task pingTask;
private readonly CancellationTokenSource cancellationTokenSource = new (); private readonly CancellationTokenSource cancellationTokenSource = new ();
private readonly CancellationTokenSource pingCancellationTokenSource = new ();
private uint nextMessageId; private uint nextMessageId;
internal RpcSendChannel(string loggerName, IRpcConnectionProvider connectionProvider, MessageRegistry<TMessageBase> messageRegistry, int sendQueueCapacity) { internal RpcSendChannel(string loggerName, RpcClientConnectionParameters connectionParameters, IRpcConnectionProvider connectionProvider, MessageRegistry<TMessageBase> messageRegistry) {
this.logger = PhantomLogger.Create<RpcSendChannel<TMessageBase>>(loggerName);
this.connectionProvider = connectionProvider; this.connectionProvider = connectionProvider;
this.messageRegistry = messageRegistry; this.messageRegistry = messageRegistry;
this.messageReplyTracker = new MessageReplyTracker(loggerName); this.messageReplyTracker = new MessageReplyTracker(loggerName);
this.sendQueue = Channel.CreateBounded<IFrame>(new BoundedChannelOptions(sendQueueCapacity) { this.sendQueue = Channel.CreateBounded<IFrame>(new BoundedChannelOptions(connectionParameters.SendQueueCapacity) {
AllowSynchronousContinuations = false, AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.Wait, FullMode = BoundedChannelFullMode.Wait,
SingleReader = true, SingleReader = true,
@@ -32,6 +38,7 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
}); });
this.sendQueueTask = ProcessSendQueue(); this.sendQueueTask = ProcessSendQueue();
this.pingTask = Ping(connectionParameters.PingInterval);
} }
public bool TrySendMessage<TMessage>(TMessage message) where TMessage : TMessageBase { public bool TrySendMessage<TMessage>(TMessage message) where TMessage : TMessageBase {
@@ -81,7 +88,9 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
// TODO figure out cancellation // TODO figure out cancellation
await foreach (IFrame frame in sendQueue.Reader.ReadAllAsync(cancellationToken)) { await foreach (IFrame frame in sendQueue.Reader.ReadAllAsync(cancellationToken)) {
while (!cancellationToken.IsCancellationRequested) { while (true) {
cancellationToken.ThrowIfCancellationRequested();
Stream stream; Stream stream;
try { try {
stream = await connectionProvider.GetStream(); stream = await connectionProvider.GetStream();
@@ -98,24 +107,33 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
} }
} }
[SuppressMessage("ReSharper", "FunctionNeverReturns")]
private async Task Ping(TimeSpan interval) {
CancellationToken cancellationToken = pingCancellationTokenSource.Token;
while (true) {
await Task.Delay(interval, cancellationToken);
if (!sendQueue.Writer.TryWrite(PingFrame.Instance)) {
logger.Warning("Skipped a ping due to a full queue.");
}
}
}
internal void ReceiveReply(ReplyFrame frame) { internal void ReceiveReply(ReplyFrame frame) {
messageReplyTracker.ReceiveReply(frame.ReplyingToMessageId, frame.SerializedReply); messageReplyTracker.ReceiveReply(frame.ReplyingToMessageId, frame.SerializedReply);
} }
internal void ReceiveError(uint messageId, RpcError error) { internal void ReceiveError(uint messageId, RpcError error) {
messageReplyTracker.FailReply(messageId, error switch { messageReplyTracker.FailReply(messageId, RpcErrorException.From(error));
RpcError.UnknownMessageRegistryCode => new RpcErrorException("Unknown message registry code", error),
RpcError.MessageDeserializationError => new RpcErrorException("Message deserialization error", error),
RpcError.MessageHandlingError => new RpcErrorException("Message handling error", error),
_ => new RpcErrorException("Unknown error", error),
});
} }
internal async Task Close() { internal async Task Close() {
await pingCancellationTokenSource.CancelAsync();
sendQueue.Writer.TryComplete(); sendQueue.Writer.TryComplete();
try { try {
await sendQueueTask; await Task.WhenAll(sendQueueTask, pingTask);
} catch (Exception) { } catch (Exception) {
// Ignore. // Ignore.
} }
@@ -124,5 +142,6 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
public void Dispose() { public void Dispose() {
sendQueueTask.Dispose(); sendQueueTask.Dispose();
cancellationTokenSource.Dispose(); cancellationTokenSource.Dispose();
pingCancellationTokenSource.Dispose();
} }
} }

View File

@@ -1,22 +0,0 @@
using System.Buffers;
namespace Phantom.Utils.Rpc.Runtime.Utils;
readonly record struct RentedMemory<T>(T[] Array, int Length) : IDisposable {
public Span<T> AsSpan => Array.AsSpan(..Length);
public Memory<T> AsMemory => Array.AsMemory(..Length);
public void Dispose() {
ArrayPool<T>.Shared.Return(Array);
}
public static RentedMemory<T> Rent(int bytes) {
T[] buffer = ArrayPool<T>.Shared.Rent(bytes);
try {
return new RentedMemory<T>(buffer, bytes);
} catch (Exception) {
ArrayPool<T>.Shared.Return(buffer);
throw;
}
}
}

View File

@@ -1,7 +0,0 @@
using Phantom.Utils.Actor;
namespace Phantom.Utils.Rpc.Runtime2;
public interface IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> where TRegistrationMessage : TServerMessage {
Task<Props<TServerMessage>?> TryRegister(RpcConnectionToClient<TClientMessage> connection, TRegistrationMessage message);
}

View File

@@ -2,9 +2,9 @@
using System.Buffers.Binary; using System.Buffers.Binary;
using MemoryPack; using MemoryPack;
namespace Phantom.Utils.Rpc.Runtime.Utils; namespace Phantom.Utils.Rpc;
public static class Serialization { static class Serialization {
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8; private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
private static async ValueTask WritePrimitive<T>(T value, int size, Action<Span<byte>, T> writer, Stream stream, CancellationToken cancellationToken) { private static async ValueTask WritePrimitive<T>(T value, int size, Action<Span<byte>, T> writer, Stream stream, CancellationToken cancellationToken) {
@@ -66,4 +66,23 @@ public static class Serialization {
public static T Deserialize<T>(ReadOnlyMemory<byte> buffer) { public static T Deserialize<T>(ReadOnlyMemory<byte> buffer) {
return MemoryPackSerializer.Deserialize<T>(buffer.Span, SerializerOptions)!; return MemoryPackSerializer.Deserialize<T>(buffer.Span, SerializerOptions)!;
} }
private readonly record struct RentedMemory<T>(T[] Array, int Length) : IDisposable {
public Span<T> AsSpan => Array.AsSpan(..Length);
public Memory<T> AsMemory => Array.AsMemory(..Length);
public void Dispose() {
ArrayPool<T>.Shared.Return(Array);
}
public static RentedMemory<T> Rent(int bytes) {
T[] buffer = ArrayPool<T>.Shared.Rent(bytes);
try {
return new RentedMemory<T>(buffer, bytes);
} catch (Exception) {
ArrayPool<T>.Shared.Return(buffer);
throw;
}
}
}
} }

View File

@@ -1,24 +1,19 @@
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Utils.Actor; using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Web.Services.Rpc; namespace Phantom.Web.Services.Rpc;
public sealed class ControllerConnection { public sealed class ControllerConnection(RpcSendChannel<IMessageToController> connection) {
private readonly RpcConnectionToServer<IMessageToController> connection; public ValueTask Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return connection.SendMessage(message, CancellationToken.None);
public ControllerConnection(RpcConnectionToServer<IMessageToController> connection) {
this.connection = connection;
}
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return connection.Send(message);
} }
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken = default) where TMessage : IMessageToController, ICanReply<TReply> { public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken = default) where TMessage : IMessageToController, ICanReply<TReply> {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken); return connection.SendMessage<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
} }
public Task<TReply> Send<TMessage, TReply>(TMessage message, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController, ICanReply<TReply> { public Task<TReply> Send<TMessage, TReply>(TMessage message, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController, ICanReply<TReply> {
return connection.Send<TMessage, TReply>(message, Timeout.InfiniteTimeSpan, waitForReplyCancellationToken); return connection.SendMessage<TMessage, TReply>(message, Timeout.InfiniteTimeSpan, waitForReplyCancellationToken);
} }
} }

View File

@@ -9,7 +9,6 @@ namespace Phantom.Web.Services.Rpc;
sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> { sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
public readonly record struct Init( public readonly record struct Init(
RpcConnectionToServer<IMessageToController> Connection,
AgentManager AgentManager, AgentManager AgentManager,
InstanceManager InstanceManager, InstanceManager InstanceManager,
InstanceLogManager InstanceLogManager, InstanceLogManager InstanceLogManager,
@@ -21,7 +20,6 @@ sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
return Props<IMessageToWeb>.Create(() => new ControllerMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume }); return Props<IMessageToWeb>.Create(() => new ControllerMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
} }
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly AgentManager agentManager; private readonly AgentManager agentManager;
private readonly InstanceManager instanceManager; private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager; private readonly InstanceLogManager instanceLogManager;
@@ -29,7 +27,6 @@ sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
private readonly TaskCompletionSource<bool> registerSuccessWaiter; private readonly TaskCompletionSource<bool> registerSuccessWaiter;
private ControllerMessageHandlerActor(Init init) { private ControllerMessageHandlerActor(Init init) {
this.connection = init.Connection;
this.agentManager = init.AgentManager; this.agentManager = init.AgentManager;
this.instanceManager = init.InstanceManager; this.instanceManager = init.InstanceManager;
this.instanceLogManager = init.InstanceLogManager; this.instanceLogManager = init.InstanceLogManager;
@@ -41,7 +38,6 @@ sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
Receive<RefreshInstancesMessage>(HandleRefreshInstances); Receive<RefreshInstancesMessage>(HandleRefreshInstances);
Receive<InstanceOutputMessage>(HandleInstanceOutput); Receive<InstanceOutputMessage>(HandleInstanceOutput);
Receive<RefreshUserSessionMessage>(HandleRefreshUserSession); Receive<RefreshUserSessionMessage>(HandleRefreshUserSession);
Receive<ReplyMessage>(HandleReply);
} }
private void HandleRegisterWebResult(RegisterWebResultMessage message) { private void HandleRegisterWebResult(RegisterWebResultMessage message) {
@@ -63,8 +59,4 @@ sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
private void HandleRefreshUserSession(RefreshUserSessionMessage message) { private void HandleRefreshUserSession(RefreshUserSessionMessage message) {
userSessionRefreshManager.RefreshUser(message.UserGuid); userSessionRefreshManager.RefreshUser(message.UserGuid);
} }
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
} }

View File

@@ -8,29 +8,20 @@ using Phantom.Web.Services.Instances;
namespace Phantom.Web.Services.Rpc; namespace Phantom.Web.Services.Rpc;
public sealed class ControllerMessageHandlerFactory { public sealed class ControllerMessageHandlerFactory(
private readonly RpcConnectionToServer<IMessageToController> connection; AgentManager agentManager,
private readonly AgentManager agentManager; InstanceManager instanceManager,
private readonly InstanceManager instanceManager; InstanceLogManager instanceLogManager,
private readonly InstanceLogManager instanceLogManager; UserSessionRefreshManager userSessionRefreshManager
private readonly UserSessionRefreshManager userSessionRefreshManager; ) {
private readonly TaskCompletionSource<bool> registerSuccessWaiter = AsyncTasks.CreateCompletionSource<bool>(); private readonly TaskCompletionSource<bool> registerSuccessWaiter = AsyncTasks.CreateCompletionSource<bool>();
public Task<bool> RegisterSuccessWaiter => registerSuccessWaiter.Task; public Task<bool> RegisterSuccessWaiter => registerSuccessWaiter.Task;
private int messageHandlerId = 0; private int messageHandlerId = 0;
public ControllerMessageHandlerFactory(RpcConnectionToServer<IMessageToController> connection, AgentManager agentManager, InstanceManager instanceManager, InstanceLogManager instanceLogManager, UserSessionRefreshManager userSessionRefreshManager) {
this.connection = connection;
this.agentManager = agentManager;
this.instanceManager = instanceManager;
this.instanceLogManager = instanceLogManager;
this.userSessionRefreshManager = userSessionRefreshManager;
}
public ActorRef<IMessageToWeb> Create(IActorRefFactory actorSystem) { public ActorRef<IMessageToWeb> Create(IActorRefFactory actorSystem) {
var init = new ControllerMessageHandlerActor.Init(connection, agentManager, instanceManager, instanceLogManager, userSessionRefreshManager, registerSuccessWaiter); var init = new ControllerMessageHandlerActor.Init(agentManager, instanceManager, instanceLogManager, userSessionRefreshManager, registerSuccessWaiter);
var name = "ControllerMessageHandler-" + Interlocked.Increment(ref messageHandlerId); var name = "ControllerMessageHandler-" + Interlocked.Increment(ref messageHandlerId);
return actorSystem.ActorOf(ControllerMessageHandlerActor.Factory(init), name); return actorSystem.ActorOf(ControllerMessageHandlerActor.Factory(init), name);
} }

View File

@@ -1,24 +0,0 @@
using NetMQ.Sockets;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Actor;
using ILogger = Serilog.ILogger;
namespace Phantom.Web.Services.Rpc;
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToWeb, IMessageToController, ReplyMessage> {
public static Task Launch(RpcClientSocket<IMessageToWeb, IMessageToController, ReplyMessage> socket, ActorRef<IMessageToWeb> handlerActorRef, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(socket, handlerActorRef, disconnectSemaphore, receiveCancellationToken).Launch();
}
private RpcClientRuntime(RpcClientSocket<IMessageToWeb, IMessageToController, ReplyMessage> socket, ActorRef<IMessageToWeb> handlerActor, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, handlerActor, disconnectSemaphore, receiveCancellationToken) {}
protected override async Task SendDisconnectMessage(ClientSocket socket, ILogger logger) {
var unregisterMessageBytes = WebMessageRegistries.ToController.Write(new UnregisterWebMessage()).ToArray();
try {
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
} catch (TimeoutException) {
logger.Error("Timed out communicating web shutdown with the controller.");
}
}
}

View File

@@ -1,10 +1,16 @@
using System.Reflection; using System.Reflection;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Actor;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Phantom.Utils.Threading;
using Phantom.Web; using Phantom.Web;
using Phantom.Web.Services; using Phantom.Web.Services;
using Phantom.Web.Services.Rpc;
var shutdownCancellationTokenSource = new CancellationTokenSource(); var shutdownCancellationTokenSource = new CancellationTokenSource();
var shutdownCancellationToken = shutdownCancellationTokenSource.Token; var shutdownCancellationToken = shutdownCancellationTokenSource.Token;
@@ -42,50 +48,61 @@ try {
string dataProtectionKeysPath = Path.GetFullPath("./keys"); string dataProtectionKeysPath = Path.GetFullPath("./keys");
CreateFolderOrStop(dataProtectionKeysPath, Chmod.URWX); CreateFolderOrStop(dataProtectionKeysPath, Chmod.URWX);
var (controllerCertificate, webToken) = webKey.Value; var (certificateThumbprint, authToken) = webKey.Value;
var administratorToken = TokenGenerator.Create(60); var administratorToken = TokenGenerator.Create(60);
var applicationProperties = new ApplicationProperties(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken)); var applicationProperties = new ApplicationProperties(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken));
// var rpcConfiguration = new RpcConfiguration("Web", controllerHost, controllerPort, controllerCertificate); var rpcClientConnectionParameters = new RpcClientConnectionParameters(
// var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken)); Host: controllerHost,
// Port: controllerPort,
// var webConfiguration = new WebLauncher.Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken); DistinguishedName: "phantom-controller",
// var webApplication = WebLauncher.CreateApplication(webConfiguration, applicationProperties, rpcSocket.Connection); CertificateThumbprint: certificateThumbprint,
// SendQueueCapacity: 500,
// using var actorSystem = ActorSystemFactory.Create("Web"); PingInterval: TimeSpan.FromSeconds(10)
// );
// ControllerMessageHandlerFactory messageHandlerFactory;
// await using (var scope = webApplication.Services.CreateAsyncScope()) { using var rpcClient = await RpcClient<IMessageToController, IMessageToWeb>.Connect("Controller", rpcClientConnectionParameters, null, WebMessageRegistries.Definitions, shutdownCancellationToken);
// messageHandlerFactory = scope.ServiceProvider.GetRequiredService<ControllerMessageHandlerFactory>(); if (rpcClient == null) {
// } return 1;
// }
// var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
// var rpcTask = RpcClientRuntime.Launch(rpcSocket, messageHandlerFactory.Create(actorSystem), rpcDisconnectSemaphore, shutdownCancellationToken); var webConfiguration = new WebLauncher.Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
// try { var webApplication = WebLauncher.CreateApplication(webConfiguration, applicationProperties, rpcClient.SendChannel);
// PhantomLogger.Root.Information("Registering with the controller...");
// if (await messageHandlerFactory.RegisterSuccessWaiter) { using var actorSystem = ActorSystemFactory.Create("Web");
// PhantomLogger.Root.Information("Successfully registered with the controller.");
// } ControllerMessageHandlerFactory messageHandlerFactory;
// else { await using (var scope = webApplication.Services.CreateAsyncScope()) {
// PhantomLogger.Root.Fatal("Failed to register with the controller."); messageHandlerFactory = scope.ServiceProvider.GetRequiredService<ControllerMessageHandlerFactory>();
// return 1; }
// }
// Task? rpcClientListener = null;
// PhantomLogger.Root.InformationHeading("Launching Phantom Panel web..."); try {
// PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken); PhantomLogger.Root.InformationHeading("Launching Phantom Panel web...");
// PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", webConfiguration.HttpUrl, webConfiguration.BasePath + "setup"); PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken);
// PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", webConfiguration.HttpUrl, webConfiguration.BasePath + "setup");
// await WebLauncher.Launch(webConfiguration, webApplication);
// } finally { await WebLauncher.Launch(webConfiguration, webApplication);
// shutdownCancellationTokenSource.Cancel();
// PhantomLogger.Root.Information("Phantom Panel web is ready.");
// rpcDisconnectSemaphore.Release(); rpcClientListener = rpcClient.Listen(messageHandlerFactory.Create(actorSystem));
// await rpcTask;
// rpcDisconnectSemaphore.Dispose(); await shutdownCancellationToken.WaitHandle.WaitOneAsync();
// } finally {
// NetMQConfig.Cleanup(); try {
// } await rpcClient.SendChannel.SendMessage(new UnregisterWebMessage(), CancellationToken.None);
// TODO wait for acknowledgment
} catch (Exception e) {
PhantomLogger.Root.Warning(e, "Could not unregister agent after shutdown.");
} finally {
await rpcClient.Shutdown();
if (rpcClientListener != null) {
await rpcClientListener;
}
}
}
return 0; return 0;
} catch (OperationCanceledException) { } catch (OperationCanceledException) {

View File

@@ -1,5 +1,6 @@
using Microsoft.AspNetCore.DataProtection; using Microsoft.AspNetCore.DataProtection;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Web.Services; using Phantom.Web.Services;
using Serilog; using Serilog;
using ILogger = Serilog.ILogger; using ILogger = Serilog.ILogger;
@@ -11,7 +12,7 @@ static class WebLauncher {
public string HttpUrl => "http://" + Host + ":" + Port; public string HttpUrl => "http://" + Host + ":" + Port;
} }
internal static WebApplication CreateApplication(Configuration config, ApplicationProperties applicationProperties, RpcConnectionToServer<IMessageToController> controllerConnection) { internal static WebApplication CreateApplication(Configuration config, ApplicationProperties applicationProperties, RpcSendChannel<IMessageToController> sendChannel) {
var assembly = typeof(WebLauncher).Assembly; var assembly = typeof(WebLauncher).Assembly;
var builder = WebApplication.CreateBuilder(new WebApplicationOptions { var builder = WebApplication.CreateBuilder(new WebApplicationOptions {
ApplicationName = assembly.GetName().Name, ApplicationName = assembly.GetName().Name,
@@ -28,7 +29,7 @@ static class WebLauncher {
} }
builder.Services.AddSingleton(applicationProperties); builder.Services.AddSingleton(applicationProperties);
builder.Services.AddSingleton(controllerConnection); builder.Services.AddSingleton(sendChannel);
builder.Services.AddPhantomServices(); builder.Services.AddPhantomServices();
builder.Services.AddSingleton<IHostLifetime>(new NullLifetime()); builder.Services.AddSingleton<IHostLifetime>(new NullLifetime());