1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-09-17 12:24:49 +02:00

2 Commits

Author SHA1 Message Date
47e563f7ce Replace NetMQ with custom TCP logic 2025-09-10 20:15:25 +02:00
29c403f2d4 Replace NetMQ with custom TCP logic 2025-09-10 19:48:32 +02:00
36 changed files with 202 additions and 251 deletions

View File

@@ -1,43 +0,0 @@
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Agent.Rpc;
sealed class KeepAliveLoop {
private static readonly ILogger Logger = PhantomLogger.Create<KeepAliveLoop>();
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromSeconds(10);
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly CancellationTokenSource cancellationTokenSource = new ();
public KeepAliveLoop(RpcConnectionToServer<IMessageToController> connection) {
this.connection = connection;
Task.Run(Run);
}
private async Task Run() {
var cancellationToken = cancellationTokenSource.Token;
try {
await connection.IsReady.WaitAsync(cancellationToken);
Logger.Information("Started keep-alive loop.");
while (true) {
await Task.Delay(KeepAliveInterval, cancellationToken);
await connection.Send(new AgentIsAliveMessage()).WaitAsync(cancellationToken);
}
} catch (OperationCanceledException) {
// Ignore.
} finally {
cancellationTokenSource.Dispose();
Logger.Information("Stopped keep-alive loop.");
}
}
public void Cancel() {
cancellationTokenSource.Cancel();
}
}

View File

@@ -1,12 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" />
</ItemGroup>
</Project>

View File

@@ -1,8 +1,8 @@
using Akka.Actor;
using Phantom.Agent.Minecraft.Java;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Instances;
using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Agent;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;

View File

@@ -4,8 +4,8 @@ using Phantom.Agent.Minecraft.Launcher;
using Phantom.Agent.Minecraft.Launcher.Types;
using Phantom.Agent.Minecraft.Properties;
using Phantom.Agent.Minecraft.Server;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;

View File

@@ -1,6 +1,6 @@
using Phantom.Agent.Minecraft.Launcher;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Rpc;
namespace Phantom.Agent.Services.Instances;

View File

@@ -1,4 +1,4 @@
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance;

View File

@@ -1,6 +1,6 @@
using System.Collections.Immutable;
using System.Threading.Channels;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Rpc;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Logging;
using Phantom.Utils.Tasks;

View File

@@ -1,6 +1,6 @@
using Phantom.Agent.Minecraft.Instance;
using Phantom.Agent.Minecraft.Server;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Instance;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Logging;

View File

@@ -8,7 +8,6 @@
<ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" />
<ProjectReference Include="..\Phantom.Agent.Minecraft\Phantom.Agent.Minecraft.csproj" />
<ProjectReference Include="..\Phantom.Agent.Rpc\Phantom.Agent.Rpc.csproj" />
</ItemGroup>
</Project>

View File

@@ -1,7 +1,7 @@
using Phantom.Common.Messages.Agent;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Agent.Rpc;
namespace Phantom.Agent.Services.Rpc;
public sealed class ControllerConnection(RpcSendChannel<IMessageToController> sendChannel) {
public ValueTask Send<TMessage>(TMessage message) where TMessage : IMessageToController {

View File

@@ -1,6 +1,6 @@
using Phantom.Common.Data;
namespace Phantom.Agent.Rpc;
namespace Phantom.Agent.Services.Rpc;
sealed class RpcClientAgentHandshake(AuthToken authToken) {

View File

@@ -1,6 +1,5 @@
using System.Reflection;
using Phantom.Agent;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services;
using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Agent;
@@ -50,24 +49,31 @@ try {
var (certificateThumbprint, authToken) = agentKey.Value;
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
var rpcClientConnectionParameters = new RpcClientConnectionParameters(
Host: controllerHost,
Port: controllerPort,
DistinguishedName: "phantom-controller",
CertificateThumbprint: certificateThumbprint,
SendQueueCapacity: 500,
PingInterval: TimeSpan.FromSeconds(10)
);
using var rpcClient = await RpcClient<IMessageToController, IMessageToAgent>.Connect("Controller", controllerHost, controllerPort, "phantom-controller", certificateThumbprint, null, AgentMessageRegistries.Definitions, shutdownCancellationToken);
using var rpcClient = await RpcClient<IMessageToController, IMessageToAgent>.Connect("Controller", rpcClientConnectionParameters, null, AgentMessageRegistries.Definitions, shutdownCancellationToken);
if (rpcClient == null) {
return 1;
}
Task? rpcClientListener = null;
try {
// var rpcConfiguration = new RpcConfiguration("Agent", controllerHost, controllerPort, controllerCertificate);
// var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
//
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcClient.SendChannel));
await agentServices.Initialize();
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(rpcClient.SendChannel, agentServices, shutdownCancellationTokenSource);
var rpcMessageHandlerActor = agentServices.ActorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
PhantomLogger.Root.Information("Phantom Panel agent is ready.");
rpcClientListener = rpcClient.Listen(rpcMessageHandlerActor);
await shutdownCancellationToken.WaitHandle.WaitOneAsync();

View File

@@ -3,7 +3,6 @@ using Phantom.Common.Data.Agent;
using Phantom.Common.Messages.Agent.Handshake;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
using Serilog;
namespace Phantom.Controller.Services.Rpc;

View File

@@ -18,8 +18,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Agent", "Agent\Phan
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Agent.Minecraft", "Agent\Phantom.Agent.Minecraft\Phantom.Agent.Minecraft.csproj", "{9FE000D0-91AC-4CB4-8956-91CCC0270015}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Agent.Rpc", "Agent\Phantom.Agent.Rpc\Phantom.Agent.Rpc.csproj", "{665C7B87-0165-48BC-B6A6-17A3812A70C9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Agent.Services", "Agent\Phantom.Agent.Services\Phantom.Agent.Services.csproj", "{AEE8B77E-AB07-423F-9981-8CD829ACB834}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Data", "Common\Phantom.Common.Data\Phantom.Common.Data.csproj", "{6C3DB1E5-F695-4D70-8F3A-78C2957274BE}"
@@ -76,10 +74,6 @@ Global
{9FE000D0-91AC-4CB4-8956-91CCC0270015}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9FE000D0-91AC-4CB4-8956-91CCC0270015}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9FE000D0-91AC-4CB4-8956-91CCC0270015}.Release|Any CPU.Build.0 = Release|Any CPU
{665C7B87-0165-48BC-B6A6-17A3812A70C9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{665C7B87-0165-48BC-B6A6-17A3812A70C9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{665C7B87-0165-48BC-B6A6-17A3812A70C9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{665C7B87-0165-48BC-B6A6-17A3812A70C9}.Release|Any CPU.Build.0 = Release|Any CPU
{AEE8B77E-AB07-423F-9981-8CD829ACB834}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AEE8B77E-AB07-423F-9981-8CD829ACB834}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AEE8B77E-AB07-423F-9981-8CD829ACB834}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -164,7 +158,6 @@ Global
GlobalSection(NestedProjects) = preSolution
{418BE1BF-9F63-4B46-B4E4-DF64C3B3DDA7} = {F5878792-64C8-4ECF-A075-66341FF97127}
{9FE000D0-91AC-4CB4-8956-91CCC0270015} = {F5878792-64C8-4ECF-A075-66341FF97127}
{665C7B87-0165-48BC-B6A6-17A3812A70C9} = {F5878792-64C8-4ECF-A075-66341FF97127}
{AEE8B77E-AB07-423F-9981-8CD829ACB834} = {F5878792-64C8-4ECF-A075-66341FF97127}
{6C3DB1E5-F695-4D70-8F3A-78C2957274BE} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}
{95B55357-F8F0-48C2-A1C2-5EA997651783} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}

View File

@@ -3,10 +3,12 @@
namespace Phantom.Utils.Rpc.Frame;
interface IFrame {
private const byte TypePingId = 0;
private const byte TypeMessageId = 1;
private const byte TypeReplyId = 2;
private const byte TypeErrorId = 3;
static readonly ReadOnlyMemory<byte> TypePing = new ([TypePingId]);
static readonly ReadOnlyMemory<byte> TypeMessage = new ([TypeMessageId]);
static readonly ReadOnlyMemory<byte> TypeReply = new ([TypeReplyId]);
static readonly ReadOnlyMemory<byte> TypeError = new ([TypeErrorId]);
@@ -18,6 +20,9 @@ interface IFrame {
await stream.ReadExactlyAsync(oneByteBuffer, cancellationToken);
switch (oneByteBuffer[0]) {
case TypePingId:
break;
case TypeMessageId:
var messageFrame = await MessageFrame.Read(stream, cancellationToken);
await reader.OnMessageFrame(messageFrame, stream, cancellationToken);

View File

@@ -1,5 +1,4 @@
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
namespace Phantom.Utils.Rpc.Frame.Types;

View File

@@ -1,5 +1,4 @@
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
namespace Phantom.Utils.Rpc.Frame.Types;

View File

@@ -0,0 +1,11 @@
namespace Phantom.Utils.Rpc.Frame.Types;
sealed record PingFrame : IFrame {
public static PingFrame Instance { get; } = new PingFrame();
public ReadOnlyMemory<byte> Type => IFrame.TypePing;
public Task Write(Stream stream, CancellationToken cancellationToken) {
return Task.CompletedTask;
}
}

View File

@@ -1,5 +1,4 @@
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
namespace Phantom.Utils.Rpc.Frame.Types;

View File

@@ -1,7 +1,6 @@
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Frame.Types;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
using Serilog;
namespace Phantom.Utils.Rpc.Message;

View File

@@ -1,7 +1,6 @@
using System.Collections.Concurrent;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Runtime.Utils;
using Phantom.Utils.Tasks;
using Serilog;

View File

@@ -3,16 +3,15 @@ using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Frame;
using Phantom.Utils.Rpc.Frame.Types;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime.Tls;
using Serilog;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> : IDisposable {
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, string host, ushort port, string distinguishedName, RpcCertificateThumbprint certificateThumbprint, RpcClientHandshake handshake, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) {
RpcClientConnector connector = new RpcClientConnector(loggerName, host, port, distinguishedName, certificateThumbprint, handshake);
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, RpcClientConnectionParameters connectionParameters, RpcClientHandshake handshake, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) {
RpcClientConnector connector = new RpcClientConnector(loggerName, connectionParameters, handshake);
RpcClientConnector.Connection? connection = await connector.EstablishNewConnection(cancellationToken);
return connection == null ? null : new RpcClient<TClientToServerMessage, TServerToClientMessage>(loggerName, messageDefinitions, connector, connection);
return connection == null ? null : new RpcClient<TClientToServerMessage, TServerToClientMessage>(loggerName, connectionParameters, messageDefinitions, connector, connection);
}
private readonly ILogger logger;
@@ -21,12 +20,12 @@ public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> :
public RpcSendChannel<TClientToServerMessage> SendChannel { get; }
private RpcClient(string loggerName, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, RpcClientConnector connector, RpcClientConnector.Connection connection) {
private RpcClient(string loggerName, RpcClientConnectionParameters connectionParameters, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, RpcClientConnector connector, RpcClientConnector.Connection connection) {
this.logger = PhantomLogger.Create<RpcClient<TClientToServerMessage, TServerToClientMessage>>(loggerName);
this.serverToClientMessageRegistry = messageDefinitions.ToClient;
this.connection = new RpcClientConnection(loggerName, connector, connection);
this.SendChannel = new RpcSendChannel<TClientToServerMessage>(loggerName, this.connection, messageDefinitions.ToServer, sendQueueCapacity: 500);
this.SendChannel = new RpcSendChannel<TClientToServerMessage>(loggerName, connectionParameters, this.connection, messageDefinitions.ToServer);
}
public async Task Listen(ActorRef<TServerToClientMessage> actor) {

View File

@@ -0,0 +1,12 @@
using Phantom.Utils.Rpc.Runtime.Tls;
namespace Phantom.Utils.Rpc.Runtime;
public readonly record struct RpcClientConnectionParameters(
string Host,
ushort Port,
string DistinguishedName,
RpcCertificateThumbprint CertificateThumbprint,
ushort SendQueueCapacity,
TimeSpan PingInterval
);

View File

@@ -15,19 +15,15 @@ internal sealed class RpcClientConnector {
private static readonly TimeSpan DisconnectTimeout = TimeSpan.FromSeconds(10);
private readonly ILogger logger;
private readonly string host;
private readonly ushort port;
private readonly RpcCertificateThumbprint certificateThumbprint;
private readonly RpcClientConnectionParameters parameters;
private readonly RpcClientHandshake handshake;
private readonly SslClientAuthenticationOptions sslOptions;
private bool loggedCertificateValidationError = false;
public RpcClientConnector(string loggerName, string host, ushort port, string distinguishedName, RpcCertificateThumbprint certificateThumbprint, RpcClientHandshake handshake) {
public RpcClientConnector(string loggerName, RpcClientConnectionParameters parameters, RpcClientHandshake handshake) {
this.logger = PhantomLogger.Create<RpcClientConnector>(loggerName);
this.host = host;
this.port = port;
this.certificateThumbprint = certificateThumbprint;
this.parameters = parameters;
this.handshake = handshake;
this.sslOptions = new SslClientAuthenticationOptions {
@@ -37,7 +33,7 @@ internal sealed class RpcClientConnector {
EnabledSslProtocols = TlsSupport.SupportedProtocols,
EncryptionPolicy = EncryptionPolicy.RequireEncryption,
RemoteCertificateValidationCallback = ValidateServerCertificate,
TargetHost = distinguishedName,
TargetHost = parameters.DistinguishedName,
};
}
@@ -65,11 +61,11 @@ internal sealed class RpcClientConnector {
}
internal async Task<Connection?> EstablishNewConnection(CancellationToken cancellationToken) {
logger.Information("Connecting to {Host}:{Port}...", host, port);
logger.Information("Connecting to {Host}:{Port}...", parameters.Host, parameters.Port);
Socket clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
try {
await clientSocket.ConnectAsync(host, port, cancellationToken);
await clientSocket.ConnectAsync(parameters.Host, parameters.Port, cancellationToken);
} catch (Exception e) {
logger.Error(e, "Could not connect.");
throw;
@@ -129,7 +125,7 @@ internal sealed class RpcClientConnector {
else if (sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNameMismatch)) {
logger.Error("Could not establish a secure connection, server certificate has the wrong name: {Name}", certificate.Subject);
}
else if (!certificateThumbprint.Check(certificate)) {
else if (!parameters.CertificateThumbprint.Check(certificate)) {
logger.Error("Could not establish a secure connection, server certificate does not match.");
}
else if (TlsSupport.CheckAlgorithm((X509Certificate2) certificate) is {} error) {

View File

@@ -1,5 +1,20 @@
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcErrorException(string message, RpcError error) : Exception(message) {
public RpcError Error => error;
public sealed class RpcErrorException : Exception {
internal static RpcErrorException From(RpcError error) {
return error switch {
RpcError.InvalidData => new RpcErrorException("Invalid data", error),
RpcError.UnknownMessageRegistryCode => new RpcErrorException("Unknown message registry code", error),
RpcError.MessageDeserializationError => new RpcErrorException("Message deserialization error", error),
RpcError.MessageHandlingError => new RpcErrorException("Message handling error", error),
RpcError.MessageTooLarge => new RpcErrorException("Message is too large", error),
_ => new RpcErrorException("Unknown error", error),
};
}
public RpcError Error { get; }
internal RpcErrorException(string message, RpcError error) : base(message) {
this.Error = error;
}
}

View File

@@ -1,5 +0,0 @@
namespace Phantom.Utils.Rpc.Runtime;
sealed class RpcReceiveChannel {
}

View File

@@ -1,30 +1,36 @@
using System.Threading.Channels;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Channels;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Frame;
using Phantom.Utils.Rpc.Frame.Types;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime.Utils;
using Serilog;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcSendChannel<TMessageBase> : IDisposable {
private readonly ILogger logger;
private readonly IRpcConnectionProvider connectionProvider;
private readonly MessageRegistry<TMessageBase> messageRegistry;
private readonly MessageReplyTracker messageReplyTracker;
private readonly Channel<IFrame> sendQueue;
private readonly Task sendQueueTask;
private readonly Task pingTask;
private readonly CancellationTokenSource cancellationTokenSource = new ();
private readonly CancellationTokenSource pingCancellationTokenSource = new ();
private uint nextMessageId;
internal RpcSendChannel(string loggerName, IRpcConnectionProvider connectionProvider, MessageRegistry<TMessageBase> messageRegistry, int sendQueueCapacity) {
internal RpcSendChannel(string loggerName, RpcClientConnectionParameters connectionParameters, IRpcConnectionProvider connectionProvider, MessageRegistry<TMessageBase> messageRegistry) {
this.logger = PhantomLogger.Create<RpcSendChannel<TMessageBase>>(loggerName);
this.connectionProvider = connectionProvider;
this.messageRegistry = messageRegistry;
this.messageReplyTracker = new MessageReplyTracker(loggerName);
this.sendQueue = Channel.CreateBounded<IFrame>(new BoundedChannelOptions(sendQueueCapacity) {
this.sendQueue = Channel.CreateBounded<IFrame>(new BoundedChannelOptions(connectionParameters.SendQueueCapacity) {
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
@@ -32,6 +38,7 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
});
this.sendQueueTask = ProcessSendQueue();
this.pingTask = Ping(connectionParameters.PingInterval);
}
public bool TrySendMessage<TMessage>(TMessage message) where TMessage : TMessageBase {
@@ -81,7 +88,9 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
// TODO figure out cancellation
await foreach (IFrame frame in sendQueue.Reader.ReadAllAsync(cancellationToken)) {
while (!cancellationToken.IsCancellationRequested) {
while (true) {
cancellationToken.ThrowIfCancellationRequested();
Stream stream;
try {
stream = await connectionProvider.GetStream();
@@ -98,24 +107,33 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
}
}
[SuppressMessage("ReSharper", "FunctionNeverReturns")]
private async Task Ping(TimeSpan interval) {
CancellationToken cancellationToken = pingCancellationTokenSource.Token;
while (true) {
await Task.Delay(interval, cancellationToken);
if (!sendQueue.Writer.TryWrite(PingFrame.Instance)) {
logger.Warning("Skipped a ping due to a full queue.");
}
}
}
internal void ReceiveReply(ReplyFrame frame) {
messageReplyTracker.ReceiveReply(frame.ReplyingToMessageId, frame.SerializedReply);
}
internal void ReceiveError(uint messageId, RpcError error) {
messageReplyTracker.FailReply(messageId, error switch {
RpcError.UnknownMessageRegistryCode => new RpcErrorException("Unknown message registry code", error),
RpcError.MessageDeserializationError => new RpcErrorException("Message deserialization error", error),
RpcError.MessageHandlingError => new RpcErrorException("Message handling error", error),
_ => new RpcErrorException("Unknown error", error),
});
messageReplyTracker.FailReply(messageId, RpcErrorException.From(error));
}
internal async Task Close() {
await pingCancellationTokenSource.CancelAsync();
sendQueue.Writer.TryComplete();
try {
await sendQueueTask;
await Task.WhenAll(sendQueueTask, pingTask);
} catch (Exception) {
// Ignore.
}
@@ -124,5 +142,6 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
public void Dispose() {
sendQueueTask.Dispose();
cancellationTokenSource.Dispose();
pingCancellationTokenSource.Dispose();
}
}

View File

@@ -1,22 +0,0 @@
using System.Buffers;
namespace Phantom.Utils.Rpc.Runtime.Utils;
readonly record struct RentedMemory<T>(T[] Array, int Length) : IDisposable {
public Span<T> AsSpan => Array.AsSpan(..Length);
public Memory<T> AsMemory => Array.AsMemory(..Length);
public void Dispose() {
ArrayPool<T>.Shared.Return(Array);
}
public static RentedMemory<T> Rent(int bytes) {
T[] buffer = ArrayPool<T>.Shared.Rent(bytes);
try {
return new RentedMemory<T>(buffer, bytes);
} catch (Exception) {
ArrayPool<T>.Shared.Return(buffer);
throw;
}
}
}

View File

@@ -1,7 +0,0 @@
using Phantom.Utils.Actor;
namespace Phantom.Utils.Rpc.Runtime2;
public interface IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> where TRegistrationMessage : TServerMessage {
Task<Props<TServerMessage>?> TryRegister(RpcConnectionToClient<TClientMessage> connection, TRegistrationMessage message);
}

View File

@@ -2,9 +2,9 @@
using System.Buffers.Binary;
using MemoryPack;
namespace Phantom.Utils.Rpc.Runtime.Utils;
namespace Phantom.Utils.Rpc;
public static class Serialization {
static class Serialization {
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
private static async ValueTask WritePrimitive<T>(T value, int size, Action<Span<byte>, T> writer, Stream stream, CancellationToken cancellationToken) {
@@ -66,4 +66,23 @@ public static class Serialization {
public static T Deserialize<T>(ReadOnlyMemory<byte> buffer) {
return MemoryPackSerializer.Deserialize<T>(buffer.Span, SerializerOptions)!;
}
private readonly record struct RentedMemory<T>(T[] Array, int Length) : IDisposable {
public Span<T> AsSpan => Array.AsSpan(..Length);
public Memory<T> AsMemory => Array.AsMemory(..Length);
public void Dispose() {
ArrayPool<T>.Shared.Return(Array);
}
public static RentedMemory<T> Rent(int bytes) {
T[] buffer = ArrayPool<T>.Shared.Rent(bytes);
try {
return new RentedMemory<T>(buffer, bytes);
} catch (Exception) {
ArrayPool<T>.Shared.Return(buffer);
throw;
}
}
}
}

View File

@@ -1,24 +1,19 @@
using Phantom.Common.Messages.Web;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Web.Services.Rpc;
public sealed class ControllerConnection {
private readonly RpcConnectionToServer<IMessageToController> connection;
public ControllerConnection(RpcConnectionToServer<IMessageToController> connection) {
this.connection = connection;
}
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return connection.Send(message);
public sealed class ControllerConnection(RpcSendChannel<IMessageToController> connection) {
public ValueTask Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return connection.SendMessage(message, CancellationToken.None);
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken = default) where TMessage : IMessageToController, ICanReply<TReply> {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
return connection.SendMessage<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController, ICanReply<TReply> {
return connection.Send<TMessage, TReply>(message, Timeout.InfiniteTimeSpan, waitForReplyCancellationToken);
return connection.SendMessage<TMessage, TReply>(message, Timeout.InfiniteTimeSpan, waitForReplyCancellationToken);
}
}

View File

@@ -9,7 +9,6 @@ namespace Phantom.Web.Services.Rpc;
sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
public readonly record struct Init(
RpcConnectionToServer<IMessageToController> Connection,
AgentManager AgentManager,
InstanceManager InstanceManager,
InstanceLogManager InstanceLogManager,
@@ -21,7 +20,6 @@ sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
return Props<IMessageToWeb>.Create(() => new ControllerMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly AgentManager agentManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager;
@@ -29,7 +27,6 @@ sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
private readonly TaskCompletionSource<bool> registerSuccessWaiter;
private ControllerMessageHandlerActor(Init init) {
this.connection = init.Connection;
this.agentManager = init.AgentManager;
this.instanceManager = init.InstanceManager;
this.instanceLogManager = init.InstanceLogManager;
@@ -41,7 +38,6 @@ sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
Receive<RefreshInstancesMessage>(HandleRefreshInstances);
Receive<InstanceOutputMessage>(HandleInstanceOutput);
Receive<RefreshUserSessionMessage>(HandleRefreshUserSession);
Receive<ReplyMessage>(HandleReply);
}
private void HandleRegisterWebResult(RegisterWebResultMessage message) {
@@ -63,8 +59,4 @@ sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
private void HandleRefreshUserSession(RefreshUserSessionMessage message) {
userSessionRefreshManager.RefreshUser(message.UserGuid);
}
private void HandleReply(ReplyMessage message) {
connection.Receive(message);
}
}

View File

@@ -8,29 +8,20 @@ using Phantom.Web.Services.Instances;
namespace Phantom.Web.Services.Rpc;
public sealed class ControllerMessageHandlerFactory {
private readonly RpcConnectionToServer<IMessageToController> connection;
private readonly AgentManager agentManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager;
private readonly UserSessionRefreshManager userSessionRefreshManager;
public sealed class ControllerMessageHandlerFactory(
AgentManager agentManager,
InstanceManager instanceManager,
InstanceLogManager instanceLogManager,
UserSessionRefreshManager userSessionRefreshManager
) {
private readonly TaskCompletionSource<bool> registerSuccessWaiter = AsyncTasks.CreateCompletionSource<bool>();
public Task<bool> RegisterSuccessWaiter => registerSuccessWaiter.Task;
private int messageHandlerId = 0;
public ControllerMessageHandlerFactory(RpcConnectionToServer<IMessageToController> connection, AgentManager agentManager, InstanceManager instanceManager, InstanceLogManager instanceLogManager, UserSessionRefreshManager userSessionRefreshManager) {
this.connection = connection;
this.agentManager = agentManager;
this.instanceManager = instanceManager;
this.instanceLogManager = instanceLogManager;
this.userSessionRefreshManager = userSessionRefreshManager;
}
public ActorRef<IMessageToWeb> Create(IActorRefFactory actorSystem) {
var init = new ControllerMessageHandlerActor.Init(connection, agentManager, instanceManager, instanceLogManager, userSessionRefreshManager, registerSuccessWaiter);
var init = new ControllerMessageHandlerActor.Init(agentManager, instanceManager, instanceLogManager, userSessionRefreshManager, registerSuccessWaiter);
var name = "ControllerMessageHandler-" + Interlocked.Increment(ref messageHandlerId);
return actorSystem.ActorOf(ControllerMessageHandlerActor.Factory(init), name);
}

View File

@@ -1,24 +0,0 @@
using NetMQ.Sockets;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Actor;
using ILogger = Serilog.ILogger;
namespace Phantom.Web.Services.Rpc;
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToWeb, IMessageToController, ReplyMessage> {
public static Task Launch(RpcClientSocket<IMessageToWeb, IMessageToController, ReplyMessage> socket, ActorRef<IMessageToWeb> handlerActorRef, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(socket, handlerActorRef, disconnectSemaphore, receiveCancellationToken).Launch();
}
private RpcClientRuntime(RpcClientSocket<IMessageToWeb, IMessageToController, ReplyMessage> socket, ActorRef<IMessageToWeb> handlerActor, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, handlerActor, disconnectSemaphore, receiveCancellationToken) {}
protected override async Task SendDisconnectMessage(ClientSocket socket, ILogger logger) {
var unregisterMessageBytes = WebMessageRegistries.ToController.Write(new UnregisterWebMessage()).ToArray();
try {
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
} catch (TimeoutException) {
logger.Error("Timed out communicating web shutdown with the controller.");
}
}
}

View File

@@ -1,10 +1,16 @@
using System.Reflection;
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Actor;
using Phantom.Utils.Cryptography;
using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Runtime;
using Phantom.Utils.Threading;
using Phantom.Web;
using Phantom.Web.Services;
using Phantom.Web.Services.Rpc;
var shutdownCancellationTokenSource = new CancellationTokenSource();
var shutdownCancellationToken = shutdownCancellationTokenSource.Token;
@@ -42,50 +48,61 @@ try {
string dataProtectionKeysPath = Path.GetFullPath("./keys");
CreateFolderOrStop(dataProtectionKeysPath, Chmod.URWX);
var (controllerCertificate, webToken) = webKey.Value;
var (certificateThumbprint, authToken) = webKey.Value;
var administratorToken = TokenGenerator.Create(60);
var applicationProperties = new ApplicationProperties(fullVersion, TokenGenerator.GetBytesOrThrow(administratorToken));
// var rpcConfiguration = new RpcConfiguration("Web", controllerHost, controllerPort, controllerCertificate);
// var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken));
//
// var webConfiguration = new WebLauncher.Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
// var webApplication = WebLauncher.CreateApplication(webConfiguration, applicationProperties, rpcSocket.Connection);
//
// using var actorSystem = ActorSystemFactory.Create("Web");
//
// ControllerMessageHandlerFactory messageHandlerFactory;
// await using (var scope = webApplication.Services.CreateAsyncScope()) {
// messageHandlerFactory = scope.ServiceProvider.GetRequiredService<ControllerMessageHandlerFactory>();
// }
//
// var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
// var rpcTask = RpcClientRuntime.Launch(rpcSocket, messageHandlerFactory.Create(actorSystem), rpcDisconnectSemaphore, shutdownCancellationToken);
// try {
// PhantomLogger.Root.Information("Registering with the controller...");
// if (await messageHandlerFactory.RegisterSuccessWaiter) {
// PhantomLogger.Root.Information("Successfully registered with the controller.");
// }
// else {
// PhantomLogger.Root.Fatal("Failed to register with the controller.");
// return 1;
// }
//
// PhantomLogger.Root.InformationHeading("Launching Phantom Panel web...");
// PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken);
// PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", webConfiguration.HttpUrl, webConfiguration.BasePath + "setup");
//
// await WebLauncher.Launch(webConfiguration, webApplication);
// } finally {
// shutdownCancellationTokenSource.Cancel();
//
// rpcDisconnectSemaphore.Release();
// await rpcTask;
// rpcDisconnectSemaphore.Dispose();
//
// NetMQConfig.Cleanup();
// }
var rpcClientConnectionParameters = new RpcClientConnectionParameters(
Host: controllerHost,
Port: controllerPort,
DistinguishedName: "phantom-controller",
CertificateThumbprint: certificateThumbprint,
SendQueueCapacity: 500,
PingInterval: TimeSpan.FromSeconds(10)
);
using var rpcClient = await RpcClient<IMessageToController, IMessageToWeb>.Connect("Controller", rpcClientConnectionParameters, null, WebMessageRegistries.Definitions, shutdownCancellationToken);
if (rpcClient == null) {
return 1;
}
var webConfiguration = new WebLauncher.Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
var webApplication = WebLauncher.CreateApplication(webConfiguration, applicationProperties, rpcClient.SendChannel);
using var actorSystem = ActorSystemFactory.Create("Web");
ControllerMessageHandlerFactory messageHandlerFactory;
await using (var scope = webApplication.Services.CreateAsyncScope()) {
messageHandlerFactory = scope.ServiceProvider.GetRequiredService<ControllerMessageHandlerFactory>();
}
Task? rpcClientListener = null;
try {
PhantomLogger.Root.InformationHeading("Launching Phantom Panel web...");
PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken);
PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", webConfiguration.HttpUrl, webConfiguration.BasePath + "setup");
await WebLauncher.Launch(webConfiguration, webApplication);
PhantomLogger.Root.Information("Phantom Panel web is ready.");
rpcClientListener = rpcClient.Listen(messageHandlerFactory.Create(actorSystem));
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
} finally {
try {
await rpcClient.SendChannel.SendMessage(new UnregisterWebMessage(), CancellationToken.None);
// TODO wait for acknowledgment
} catch (Exception e) {
PhantomLogger.Root.Warning(e, "Could not unregister agent after shutdown.");
} finally {
await rpcClient.Shutdown();
if (rpcClientListener != null) {
await rpcClientListener;
}
}
}
return 0;
} catch (OperationCanceledException) {

View File

@@ -1,5 +1,6 @@
using Microsoft.AspNetCore.DataProtection;
using Phantom.Common.Messages.Web;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Web.Services;
using Serilog;
using ILogger = Serilog.ILogger;
@@ -11,7 +12,7 @@ static class WebLauncher {
public string HttpUrl => "http://" + Host + ":" + Port;
}
internal static WebApplication CreateApplication(Configuration config, ApplicationProperties applicationProperties, RpcConnectionToServer<IMessageToController> controllerConnection) {
internal static WebApplication CreateApplication(Configuration config, ApplicationProperties applicationProperties, RpcSendChannel<IMessageToController> sendChannel) {
var assembly = typeof(WebLauncher).Assembly;
var builder = WebApplication.CreateBuilder(new WebApplicationOptions {
ApplicationName = assembly.GetName().Name,
@@ -28,7 +29,7 @@ static class WebLauncher {
}
builder.Services.AddSingleton(applicationProperties);
builder.Services.AddSingleton(controllerConnection);
builder.Services.AddSingleton(sendChannel);
builder.Services.AddPhantomServices();
builder.Services.AddSingleton<IHostLifetime>(new NullLifetime());