1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-10-18 15:42:50 +02:00

Compare commits

..

2 Commits

58 changed files with 458 additions and 297 deletions

View File

@ -1,5 +1,5 @@
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.ToServer; using Phantom.Common.Messages.Agent.ToController;
using Serilog; using Serilog;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Rpc;

View File

@ -6,7 +6,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Messages\Phantom.Common.Messages.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,9 +1,9 @@
using NetMQ; using NetMQ;
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Messages; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.ToServer; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
@ -19,7 +19,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
options.CurveServerCertificate = config.ServerCertificate; options.CurveServerCertificate = config.ServerCertificate;
options.CurveCertificate = new NetMQCertificate(); options.CurveCertificate = new NetMQCertificate();
options.HelloMessage = MessageRegistries.ToServer.Write(new RegisterAgentMessage(authToken, agentInfo)).ToArray(); options.HelloMessage = AgentMessageRegistries.ToController.Write(new RegisterAgentMessage(authToken, agentInfo)).ToArray();
return new RpcLauncher(config, socket, agentInfo.Guid, listenerFactory, disconnectSemaphore, receiveCancellationToken).Launch(); return new RpcLauncher(config, socket, agentInfo.Guid, listenerFactory, disconnectSemaphore, receiveCancellationToken).Launch();
} }
@ -63,7 +63,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
LogMessageType(logger, data); LogMessageType(logger, data);
if (data.Length > 0) { if (data.Length > 0) {
MessageRegistries.ToAgent.Handle(data, handler); AgentMessageRegistries.ToAgent.Handle(data, handler);
} }
} }
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
@ -81,7 +81,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
return; return;
} }
if (data.Length > 0 && MessageRegistries.ToAgent.TryGetType(data, out var type)) { if (data.Length > 0 && AgentMessageRegistries.ToAgent.TryGetType(data, out var type)) {
logger.Verbose("Received {MessageType} ({Bytes} B) from controller.", type.Name, data.Length); logger.Verbose("Received {MessageType} ({Bytes} B) from controller.", type.Name, data.Length);
} }
else { else {

View File

@ -1,7 +1,7 @@
using NetMQ; using NetMQ;
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Common.Messages; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Rpc;
@ -15,17 +15,17 @@ public sealed class RpcServerConnection {
this.replyTracker = replyTracker; this.replyTracker = replyTracker;
} }
internal async Task Send<TMessage>(TMessage message) where TMessage : IMessageToServer { internal async Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
var bytes = MessageRegistries.ToServer.Write(message).ToArray(); var bytes = AgentMessageRegistries.ToController.Write(message).ToArray();
if (bytes.Length > 0) { if (bytes.Length > 0) {
await socket.SendAsync(bytes); await socket.SendAsync(bytes);
} }
} }
internal async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class { internal async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> where TReply : class {
var sequenceId = replyTracker.RegisterReply(); var sequenceId = replyTracker.RegisterReply();
var bytes = MessageRegistries.ToServer.Write<TMessage, TReply>(sequenceId, message).ToArray(); var bytes = AgentMessageRegistries.ToController.Write<TMessage, TReply>(sequenceId, message).ToArray();
if (bytes.Length == 0) { if (bytes.Length == 0) {
replyTracker.ForgetReply(sequenceId); replyTracker.ForgetReply(sequenceId);
return null; return null;

View File

@ -1,5 +1,5 @@
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages; using Phantom.Common.Messages.Agent;
using Serilog; using Serilog;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Rpc;
@ -24,11 +24,11 @@ public static class ServerMessaging {
Logger.Information("Server connection ready."); Logger.Information("Server connection ready.");
} }
public static Task Send<TMessage>(TMessage message) where TMessage : IMessageToServer { public static Task Send<TMessage>(TMessage message) where TMessage : IMessageToController {
return CurrentConnectionOrThrow.Send(message); return CurrentConnectionOrThrow.Send(message);
} }
public static Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class { public static Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToController<TReply> where TReply : class {
return CurrentConnectionOrThrow.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken); return CurrentConnectionOrThrow.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
} }
} }

View File

@ -6,7 +6,7 @@ using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.ToServer; using Phantom.Common.Messages.Agent.ToController;
using Serilog; using Serilog;
namespace Phantom.Agent.Services.Instances; namespace Phantom.Agent.Services.Instances;

View File

@ -2,7 +2,7 @@ using System.Collections.Immutable;
using System.Threading.Channels; using System.Threading.Channels;
using Phantom.Agent.Rpc; using Phantom.Agent.Rpc;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.ToServer; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
namespace Phantom.Agent.Services.Instances; namespace Phantom.Agent.Services.Instances;

View File

@ -14,7 +14,7 @@ using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.ToServer; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;

View File

@ -6,7 +6,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Messages\Phantom.Common.Messages.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" />
<ProjectReference Include="..\Phantom.Agent.Minecraft\Phantom.Agent.Minecraft.csproj" /> <ProjectReference Include="..\Phantom.Agent.Minecraft\Phantom.Agent.Minecraft.csproj" />
<ProjectReference Include="..\Phantom.Agent.Rpc\Phantom.Agent.Rpc.csproj" /> <ProjectReference Include="..\Phantom.Agent.Rpc\Phantom.Agent.Rpc.csproj" />
</ItemGroup> </ItemGroup>

View File

@ -2,10 +2,10 @@
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.ToServer; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Serilog; using Serilog;

View File

@ -27,7 +27,7 @@ public static class PhantomLogger {
} }
public static ILogger Create(string name1, string name2) { public static ILogger Create(string name1, string name2) {
return Create(name1 + ":" + name2); return Create(ConcatNames(name1, name2));
} }
public static ILogger Create<T>() { public static ILogger Create<T>() {
@ -38,10 +38,18 @@ public static class PhantomLogger {
return Create(typeof(T).Name, name); return Create(typeof(T).Name, name);
} }
public static ILogger Create<T>(string name1, string name2) {
return Create(typeof(T).Name, ConcatNames(name1, name2));
}
public static ILogger Create<T1, T2>() { public static ILogger Create<T1, T2>() {
return Create(typeof(T1).Name, typeof(T2).Name); return Create(typeof(T1).Name, typeof(T2).Name);
} }
private static string ConcatNames(string name1, string name2) {
return name1 + ":" + name2;
}
public static void Dispose() { public static void Dispose() {
Root.Dispose(); Root.Dispose();
Base.Dispose(); Base.Dispose();

View File

@ -0,0 +1,48 @@
using Phantom.Common.Data.Replies;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
public static class AgentMessageRegistries {
public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToAgent)));
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener> Definitions { get; } = new MessageDefinitions();
static AgentMessageRegistries() {
ToAgent.Add<RegisterAgentSuccessMessage>(0);
ToAgent.Add<RegisterAgentFailureMessage>(1);
ToAgent.Add<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(2);
ToAgent.Add<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(3);
ToAgent.Add<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(4);
ToAgent.Add<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(5);
ToAgent.Add<ReplyMessage>(127);
ToController.Add<RegisterAgentMessage>(0);
ToController.Add<UnregisterAgentMessage>(1);
ToController.Add<AgentIsAliveMessage>(2);
ToController.Add<AdvertiseJavaRuntimesMessage>(3);
ToController.Add<ReportInstanceStatusMessage>(4);
ToController.Add<InstanceOutputMessage>(5);
ToController.Add<ReportAgentStatusMessage>(6);
ToController.Add<ReportInstanceEventMessage>(7);
ToController.Add<ReplyMessage>(127);
}
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToAgentListener, IMessageToControllerListener> {
public MessageRegistry<IMessageToAgentListener> Outgoing => ToAgent;
public MessageRegistry<IMessageToControllerListener> Incoming => ToController;
public bool IsRegistrationMessage(Type messageType) {
return messageType == typeof(RegisterAgentMessage);
}
public IMessage<IMessageToAgentListener, NoReply> CreateReplyMessage( uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply);
}
}
}

View File

@ -1,14 +1,14 @@
using MemoryPack; using MemoryPack;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.BiDirectional; namespace Phantom.Common.Messages.Agent.BiDirectional;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record ReplyMessage( public sealed partial record ReplyMessage(
[property: MemoryPackOrder(0)] uint SequenceId, [property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] byte[] SerializedReply [property: MemoryPackOrder(1)] byte[] SerializedReply
) : IMessageToServer, IMessageToAgent { ) : IMessageToController, IMessageToAgent {
public Task<NoReply> Accept(IMessageToServerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReply(this); return listener.HandleReply(this);
} }

View File

@ -1,6 +1,6 @@
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages; namespace Phantom.Common.Messages.Agent;
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {} public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {}

View File

@ -1,9 +1,9 @@
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages; namespace Phantom.Common.Messages.Agent;
public interface IMessageToAgentListener { public interface IMessageToAgentListener {
Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message); Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message);

View File

@ -0,0 +1,7 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent;
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {}
public interface IMessageToController : IMessageToController<NoReply> {}

View File

@ -1,11 +1,10 @@
using Phantom.Common.Messages.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.ToServer; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages; namespace Phantom.Common.Messages.Agent;
public interface IMessageToServerListener { public interface IMessageToControllerListener {
bool IsDisposed { get; }
Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message); Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message);
Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message); Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message);
Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message); Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message);

View File

@ -2,7 +2,7 @@
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
namespace Phantom.Common.Messages.ToAgent; namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record ConfigureInstanceMessage( public sealed partial record ConfigureInstanceMessage(

View File

@ -1,7 +1,7 @@
using MemoryPack; using MemoryPack;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
namespace Phantom.Common.Messages.ToAgent; namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record LaunchInstanceMessage( public sealed partial record LaunchInstanceMessage(

View File

@ -2,7 +2,7 @@
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToAgent; namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentFailureMessage( public sealed partial record RegisterAgentFailureMessage(

View File

@ -2,7 +2,7 @@
using MemoryPack; using MemoryPack;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToAgent; namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentSuccessMessage( public sealed partial record RegisterAgentSuccessMessage(

View File

@ -1,7 +1,7 @@
using MemoryPack; using MemoryPack;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
namespace Phantom.Common.Messages.ToAgent; namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record SendCommandToInstanceMessage( public sealed partial record SendCommandToInstanceMessage(

View File

@ -2,7 +2,7 @@
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
namespace Phantom.Common.Messages.ToAgent; namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record StopInstanceMessage( public sealed partial record StopInstanceMessage(

View File

@ -3,13 +3,13 @@ using MemoryPack;
using Phantom.Common.Data.Java; using Phantom.Common.Data.Java;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer; namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AdvertiseJavaRuntimesMessage( public sealed partial record AdvertiseJavaRuntimesMessage(
[property: MemoryPackOrder(0)] ImmutableArray<TaggedJavaRuntime> Runtimes [property: MemoryPackOrder(0)] ImmutableArray<TaggedJavaRuntime> Runtimes
) : IMessageToServer { ) : IMessageToController {
public Task<NoReply> Accept(IMessageToServerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleAdvertiseJavaRuntimes(this); return listener.HandleAdvertiseJavaRuntimes(this);
} }
} }

View File

@ -0,0 +1,11 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentIsAliveMessage : IMessageToController {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleAgentIsAlive(this);
}
}

View File

@ -2,14 +2,14 @@ using System.Collections.Immutable;
using MemoryPack; using MemoryPack;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer; namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record InstanceOutputMessage( public sealed partial record InstanceOutputMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid, [property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines [property: MemoryPackOrder(1)] ImmutableArray<string> Lines
) : IMessageToServer { ) : IMessageToController {
public Task<NoReply> Accept(IMessageToServerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleInstanceOutput(this); return listener.HandleInstanceOutput(this);
} }
} }

View File

@ -2,14 +2,14 @@
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer; namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RegisterAgentMessage( public sealed partial record RegisterAgentMessage(
[property: MemoryPackOrder(0)] AuthToken AuthToken, [property: MemoryPackOrder(0)] AuthToken AuthToken,
[property: MemoryPackOrder(1)] AgentInfo AgentInfo [property: MemoryPackOrder(1)] AgentInfo AgentInfo
) : IMessageToServer { ) : IMessageToController {
public Task<NoReply> Accept(IMessageToServerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleRegisterAgent(this); return listener.HandleRegisterAgent(this);
} }
} }

View File

@ -2,14 +2,14 @@
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer; namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record ReportAgentStatusMessage( public sealed partial record ReportAgentStatusMessage(
[property: MemoryPackOrder(0)] int RunningInstanceCount, [property: MemoryPackOrder(0)] int RunningInstanceCount,
[property: MemoryPackOrder(1)] RamAllocationUnits RunningInstanceMemory [property: MemoryPackOrder(1)] RamAllocationUnits RunningInstanceMemory
) : IMessageToServer { ) : IMessageToController {
public Task<NoReply> Accept(IMessageToServerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReportAgentStatus(this); return listener.HandleReportAgentStatus(this);
} }
} }

View File

@ -2,7 +2,7 @@
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer; namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record ReportInstanceEventMessage( public sealed partial record ReportInstanceEventMessage(
@ -10,8 +10,8 @@ public sealed partial record ReportInstanceEventMessage(
[property: MemoryPackOrder(1)] DateTime UtcTime, [property: MemoryPackOrder(1)] DateTime UtcTime,
[property: MemoryPackOrder(2)] Guid InstanceGuid, [property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] IInstanceEvent Event [property: MemoryPackOrder(3)] IInstanceEvent Event
) : IMessageToServer { ) : IMessageToController {
public Task<NoReply> Accept(IMessageToServerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReportInstanceEvent(this); return listener.HandleReportInstanceEvent(this);
} }
} }

View File

@ -2,14 +2,14 @@
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer; namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record ReportInstanceStatusMessage( public sealed partial record ReportInstanceStatusMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid, [property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] IInstanceStatus InstanceStatus [property: MemoryPackOrder(1)] IInstanceStatus InstanceStatus
) : IMessageToServer { ) : IMessageToController {
public Task<NoReply> Accept(IMessageToServerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReportInstanceStatus(this); return listener.HandleReportInstanceStatus(this);
} }
} }

View File

@ -1,13 +1,13 @@
using MemoryPack; using MemoryPack;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer; namespace Phantom.Common.Messages.Agent.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record UnregisterAgentMessage( public sealed partial record UnregisterAgentMessage(
[property: MemoryPackOrder(0)] Guid AgentGuid [property: MemoryPackOrder(0)] Guid AgentGuid
) : IMessageToServer { ) : IMessageToController {
public Task<NoReply> Accept(IMessageToServerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleUnregisterAgent(this); return listener.HandleUnregisterAgent(this);
} }
} }

View File

@ -0,0 +1,18 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web.BiDirectional;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record ReplyMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] byte[] SerializedReply
) : IMessageToController, IMessageToWeb {
public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReply(this);
}
public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleReply(this);
}
}

View File

@ -0,0 +1,7 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {}
public interface IMessageToController : IMessageToController<NoReply> {}

View File

@ -0,0 +1,8 @@
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public interface IMessageToControllerListener {
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -0,0 +1,7 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public interface IMessageToWeb<TReply> : IMessage<IMessageToWebListener, TReply> {}
public interface IMessageToWeb : IMessageToWeb<NoReply> {}

View File

@ -0,0 +1,8 @@
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public interface IMessageToWebListener {
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Phantom.Common.Logging\Phantom.Common.Logging.csproj" />
<ProjectReference Include="..\Phantom.Common.Data\Phantom.Common.Data.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Rpc\Phantom.Utils.Rpc.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,31 @@
using Phantom.Common.Logging;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web;
public static class WebMessageRegistries {
public static MessageRegistry<IMessageToWebListener> ToWeb { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToWeb)));
public static MessageRegistry<IMessageToControllerListener> ToController { get; } = new (PhantomLogger.Create("MessageRegistry", nameof(ToController)));
public static IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener> Definitions { get; } = new MessageDefinitions();
static WebMessageRegistries() {
ToWeb.Add<ReplyMessage>(127);
ToController.Add<ReplyMessage>(127);
}
private sealed class MessageDefinitions : IMessageDefinitions<IMessageToWebListener, IMessageToControllerListener> {
public MessageRegistry<IMessageToWebListener> Outgoing => ToWeb;
public MessageRegistry<IMessageToControllerListener> Incoming => ToController;
public bool IsRegistrationMessage(Type messageType) {
return false;
}
public IMessage<IMessageToWebListener, NoReply> CreateReplyMessage( uint sequenceId, byte[] serializedReply) {
return new ReplyMessage(sequenceId, serializedReply);
}
}
}

View File

@ -1,7 +0,0 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages;
public interface IMessageToServer<TReply> : IMessage<IMessageToServerListener, TReply> {}
public interface IMessageToServer : IMessageToServer<NoReply> {}

View File

@ -1,33 +0,0 @@
using Phantom.Common.Data.Replies;
using Phantom.Common.Logging;
using Phantom.Common.Messages.BiDirectional;
using Phantom.Common.Messages.ToAgent;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages;
public static class MessageRegistries {
public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry:ToAgent"));
public static MessageRegistry<IMessageToServerListener> ToServer { get; } = new (PhantomLogger.Create("MessageRegistry:ToServer"));
static MessageRegistries() {
ToAgent.Add<RegisterAgentSuccessMessage>(0);
ToAgent.Add<RegisterAgentFailureMessage>(1);
ToAgent.Add<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(2);
ToAgent.Add<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(3);
ToAgent.Add<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(4);
ToAgent.Add<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(5);
ToAgent.Add<ReplyMessage>(127);
ToServer.Add<RegisterAgentMessage>(0);
ToServer.Add<UnregisterAgentMessage>(1);
ToServer.Add<AgentIsAliveMessage>(2);
ToServer.Add<AdvertiseJavaRuntimesMessage>(3);
ToServer.Add<ReportInstanceStatusMessage>(4);
ToServer.Add<InstanceOutputMessage>(5);
ToServer.Add<ReportAgentStatusMessage>(6);
ToServer.Add<ReportInstanceEventMessage>(7);
ToServer.Add<ReplyMessage>(127);
}
}

View File

@ -1,11 +0,0 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentIsAliveMessage : IMessageToServer {
public Task<NoReply> Accept(IMessageToServerListener listener) {
return listener.HandleAgentIsAlive(this);
}
}

View File

@ -6,7 +6,8 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Messages\Phantom.Common.Messages.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Messages.Web\Phantom.Common.Messages.Web.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,28 +1,29 @@
using NetMQ; using NetMQ;
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Common.Messages; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.BiDirectional;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Controller.Rpc; namespace Phantom.Controller.Rpc;
public sealed class RpcClientConnection { public sealed class RpcClientConnection<TListener> {
private readonly ServerSocket socket; private readonly ServerSocket socket;
private readonly uint routingId; private readonly uint routingId;
private readonly MessageRegistry<TListener> messageRegistry;
private readonly MessageReplyTracker messageReplyTracker; private readonly MessageReplyTracker messageReplyTracker;
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed; internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
private bool isClosed; private bool isClosed;
internal RpcClientConnection(ServerSocket socket, uint routingId, MessageReplyTracker messageReplyTracker) { internal RpcClientConnection(ServerSocket socket, uint routingId, MessageRegistry<TListener> messageRegistry, MessageReplyTracker messageReplyTracker) {
this.socket = socket; this.socket = socket;
this.routingId = routingId; this.routingId = routingId;
this.messageRegistry = messageRegistry;
this.messageReplyTracker = messageReplyTracker; this.messageReplyTracker = messageReplyTracker;
} }
public bool IsSame(RpcClientConnection other) { public bool IsSame(RpcClientConnection<TListener> other) {
return this.routingId == other.routingId; return this.routingId == other.routingId && this.socket == other.socket;
} }
public void Close() { public void Close() {
@ -34,25 +35,25 @@ public sealed class RpcClientConnection {
} }
} }
public async Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent { public async Task Send<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
if (isClosed) { if (isClosed) {
return; return;
} }
var bytes = MessageRegistries.ToAgent.Write(message).ToArray(); var bytes = messageRegistry.Write(message).ToArray();
if (bytes.Length > 0) { if (bytes.Length > 0) {
await socket.SendAsync(routingId, bytes); await socket.SendAsync(routingId, bytes);
} }
} }
public async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class { public async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> where TReply : class {
if (isClosed) { if (isClosed) {
return null; return null;
} }
var sequenceId = messageReplyTracker.RegisterReply(); var sequenceId = messageReplyTracker.RegisterReply();
var bytes = MessageRegistries.ToAgent.Write<TMessage, TReply>(sequenceId, message).ToArray(); var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
if (bytes.Length == 0) { if (bytes.Length == 0) {
messageReplyTracker.ForgetReply(sequenceId); messageReplyTracker.ForgetReply(sequenceId);
return null; return null;

View File

@ -1,122 +0,0 @@
using NetMQ.Sockets;
using Phantom.Common.Messages;
using Phantom.Common.Messages.BiDirectional;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Tasks;
using Serilog;
using Serilog.Events;
namespace Phantom.Controller.Rpc;
public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
public static Task Launch(RpcConfiguration config, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) {
var socket = new ServerSocket();
var options = socket.Options;
options.CurveServer = true;
options.CurveCertificate = config.ServerCertificate;
return new RpcLauncher(config, socket, listenerFactory, cancellationToken).Launch();
}
private readonly RpcConfiguration config;
private readonly Func<RpcClientConnection, IMessageToServerListener> listenerFactory;
private readonly CancellationToken cancellationToken;
private RpcLauncher(RpcConfiguration config, ServerSocket socket, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) : base(config, socket) {
this.config = config;
this.listenerFactory = listenerFactory;
this.cancellationToken = cancellationToken;
}
protected override void Connect(ServerSocket socket) {
var logger = config.RuntimeLogger;
var url = config.TcpUrl;
logger.Information("Starting ZeroMQ server on {Url}...", url);
socket.Bind(url);
logger.Information("ZeroMQ server initialized, listening for agent connections on port {Port}.", config.Port);
}
protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
var logger = config.RuntimeLogger;
var clients = new Dictionary<ulong, Client>();
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
clients.Remove(e.RoutingId);
logger.Debug("Closed connection to {RoutingId}.", e.RoutingId);
}
while (!cancellationToken.IsCancellationRequested) {
var (routingId, data) = socket.Receive(cancellationToken);
if (data.Length == 0) {
LogMessageType(logger, routingId, data);
continue;
}
if (!clients.TryGetValue(routingId, out var client)) {
if (!CheckIsAgentRegistrationMessage(data, logger, routingId)) {
continue;
}
var connection = new RpcClientConnection(socket, routingId, replyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(connection, listenerFactory, logger, taskManager, cancellationToken);
clients[routingId] = client;
}
LogMessageType(logger, routingId, data);
MessageRegistries.ToServer.Handle(data, client);
client.CloseIfDisposed();
}
foreach (var client in clients.Values) {
client.Connection.Closed -= OnConnectionClosed;
}
}
private static void LogMessageType(ILogger logger, uint routingId, ReadOnlyMemory<byte> data) {
if (!logger.IsEnabled(LogEventLevel.Verbose)) {
return;
}
if (data.Length > 0 && MessageRegistries.ToServer.TryGetType(data, out var type)) {
logger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", type.Name, data.Length, routingId);
}
else {
logger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId);
}
}
private static bool CheckIsAgentRegistrationMessage(ReadOnlyMemory<byte> data, ILogger logger, uint routingId) {
if (MessageRegistries.ToServer.TryGetType(data, out var type) && type == typeof(RegisterAgentMessage)) {
return true;
}
logger.Warning("Received {MessageType} from a non-registered agent {RoutingId}.", type?.Name ?? "unknown message", routingId);
return false;
}
private sealed class Client : MessageHandler<IMessageToServerListener> {
public RpcClientConnection Connection { get; }
public Client(RpcClientConnection connection, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listenerFactory(connection), logger, taskManager, cancellationToken) {
Connection = connection;
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(new ReplyMessage(sequenceId, serializedReply));
}
public void CloseIfDisposed() {
if (Listener.IsDisposed) {
Connection.Close();
}
}
}
}

View File

@ -0,0 +1,126 @@
using NetMQ.Sockets;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Tasks;
using Serilog;
using Serilog.Events;
namespace Phantom.Controller.Rpc;
public static class RpcRuntime {
public static Task Launch<TOutgoingListener, TIncomingListener>(RpcConfiguration config, IMessageDefinitions<TOutgoingListener, TIncomingListener> messageDefinitions, Func<RpcClientConnection<TOutgoingListener>, TIncomingListener> listenerFactory, CancellationToken cancellationToken) {
return RpcRuntime<TOutgoingListener, TIncomingListener>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
}
}
internal sealed class RpcRuntime<TOutgoingListener, TIncomingListener> : RpcRuntime<ServerSocket> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TOutgoingListener, TIncomingListener> messageDefinitions, Func<RpcClientConnection<TOutgoingListener>, TIncomingListener> listenerFactory, CancellationToken cancellationToken) {
return new RpcRuntime<TOutgoingListener, TIncomingListener>(config, messageDefinitions, listenerFactory, cancellationToken).Launch();
}
private static ServerSocket CreateSocket(RpcConfiguration config) {
var socket = new ServerSocket();
var options = socket.Options;
options.CurveServer = true;
options.CurveCertificate = config.ServerCertificate;
return socket;
}
private readonly RpcConfiguration config;
private readonly IMessageDefinitions<TOutgoingListener, TIncomingListener> messageDefinitions;
private readonly Func<RpcClientConnection<TOutgoingListener>, TIncomingListener> listenerFactory;
private readonly CancellationToken cancellationToken;
private RpcRuntime(RpcConfiguration config, IMessageDefinitions<TOutgoingListener, TIncomingListener> messageDefinitions, Func<RpcClientConnection<TOutgoingListener>, TIncomingListener> listenerFactory, CancellationToken cancellationToken) : base(config, CreateSocket(config)) {
this.config = config;
this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory;
this.cancellationToken = cancellationToken;
}
protected override void Connect(ServerSocket socket) {
var logger = config.RuntimeLogger;
var url = config.TcpUrl;
logger.Information("Starting ZeroMQ server on {Url}...", url);
socket.Bind(url);
logger.Information("ZeroMQ server initialized, listening for connections on port {Port}.", config.Port);
}
protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
var logger = config.RuntimeLogger;
var clients = new Dictionary<ulong, Client>();
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
clients.Remove(e.RoutingId);
logger.Debug("Closed connection to {RoutingId}.", e.RoutingId);
}
while (!cancellationToken.IsCancellationRequested) {
var (routingId, data) = socket.Receive(cancellationToken);
if (data.Length == 0) {
LogMessageType(logger, routingId, data);
continue;
}
if (!clients.TryGetValue(routingId, out var client)) {
if (!CheckIsRegistrationMessage(data, logger, routingId)) {
continue;
}
var connection = new RpcClientConnection<TOutgoingListener>(socket, routingId, messageDefinitions.Outgoing, replyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(connection, messageDefinitions, listenerFactory(connection), logger, taskManager, cancellationToken);
clients[routingId] = client;
}
LogMessageType(logger, routingId, data);
messageDefinitions.Incoming.Handle(data, client);
}
foreach (var client in clients.Values) {
client.Connection.Closed -= OnConnectionClosed;
}
}
private void LogMessageType(ILogger logger, uint routingId, ReadOnlyMemory<byte> data) {
if (!logger.IsEnabled(LogEventLevel.Verbose)) {
return;
}
if (data.Length > 0 && messageDefinitions.Incoming.TryGetType(data, out var type)) {
logger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", type.Name, data.Length, routingId);
}
else {
logger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId);
}
}
private bool CheckIsRegistrationMessage(ReadOnlyMemory<byte> data, ILogger logger, uint routingId) {
if (messageDefinitions.Incoming.TryGetType(data, out var type) && messageDefinitions.IsRegistrationMessage(type)) {
return true;
}
logger.Warning("Received {MessageType} from {RoutingId} who is not registered.", type?.Name ?? "unknown message", routingId);
return false;
}
private sealed class Client : MessageHandler<TIncomingListener> {
public RpcClientConnection<TOutgoingListener> Connection { get; }
private readonly IMessageDefinitions<TOutgoingListener, TIncomingListener> messageDefinitions;
public Client(RpcClientConnection<TOutgoingListener> connection, IMessageDefinitions<TOutgoingListener, TIncomingListener> messageDefinitions, TIncomingListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
this.Connection = connection;
this.messageDefinitions = messageDefinitions;
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
}
}
}

View File

@ -1,16 +1,16 @@
using Phantom.Common.Messages; using Phantom.Common.Messages.Agent;
using Phantom.Controller.Rpc; using Phantom.Controller.Rpc;
namespace Phantom.Controller.Services.Agents; namespace Phantom.Controller.Services.Agents;
sealed class AgentConnection { sealed class AgentConnection {
private readonly RpcClientConnection connection; private readonly RpcClientConnection<IMessageToAgentListener> connection;
internal AgentConnection(RpcClientConnection connection) { internal AgentConnection(RpcClientConnection<IMessageToAgentListener> connection) {
this.connection = connection; this.connection = connection;
} }
public bool IsSame(RpcClientConnection connection) { public bool IsSame(RpcClientConnection<IMessageToAgentListener> connection) {
return this.connection.IsSame(connection); return this.connection.IsSame(connection);
} }

View File

@ -3,8 +3,8 @@ using Phantom.Common.Data;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Rpc; using Phantom.Controller.Rpc;
using Phantom.Controller.Services.Instances; using Phantom.Controller.Services.Instances;
@ -52,7 +52,7 @@ public sealed class AgentManager {
return agents.ByGuid.ToImmutable(); return agents.ByGuid.ToImmutable();
} }
internal async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, InstanceManager instanceManager, RpcClientConnection connection) { internal async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, InstanceManager instanceManager, RpcClientConnection<IMessageToAgentListener> connection) {
if (!this.authToken.FixedTimeEquals(authToken)) { if (!this.authToken.FixedTimeEquals(authToken)) {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.InvalidToken)); await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.InvalidToken));
return false; return false;
@ -88,7 +88,7 @@ public sealed class AgentManager {
return true; return true;
} }
internal bool UnregisterAgent(Guid agentGuid, RpcClientConnection connection) { internal bool UnregisterAgent(Guid agentGuid, RpcClientConnection<IMessageToAgentListener> connection) {
if (agents.ByGuid.TryReplaceIf(agentGuid, static oldAgent => oldAgent.AsOffline(), oldAgent => oldAgent.Connection?.IsSame(connection) == true)) { if (agents.ByGuid.TryReplaceIf(agentGuid, static oldAgent => oldAgent.AsOffline(), oldAgent => oldAgent.Connection?.IsSame(connection) == true)) {
Logger.Information("Unregistered agent with GUID {Guid}.", agentGuid); Logger.Information("Unregistered agent with GUID {Guid}.", agentGuid);
return true; return true;

View File

@ -1,5 +1,7 @@
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Web;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Minecraft; using Phantom.Controller.Minecraft;
using Phantom.Controller.Rpc; using Phantom.Controller.Rpc;
@ -51,8 +53,12 @@ public sealed class ControllerServices {
this.cancellationToken = shutdownCancellationToken; this.cancellationToken = shutdownCancellationToken;
} }
public MessageToServerListener CreateMessageToServerListener(RpcClientConnection connection) { public AgentMessageListener CreateAgentMessageListener(RpcClientConnection<IMessageToAgentListener> connection) {
return new MessageToServerListener(connection, AgentManager, AgentJavaRuntimesManager, InstanceManager, InstanceLogManager, EventLog, cancellationToken); return new AgentMessageListener(connection, AgentManager, AgentJavaRuntimesManager, InstanceManager, InstanceLogManager, EventLog, cancellationToken);
}
public WebMessageListener CreateWebMessageListener(RpcClientConnection<IMessageToWebListener> connection) {
return new WebMessageListener(connection);
} }
public async Task Initialize() { public async Task Initialize() {

View File

@ -5,8 +5,8 @@ using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Database.Entities; using Phantom.Controller.Database.Entities;
using Phantom.Controller.Minecraft; using Phantom.Controller.Minecraft;

View File

@ -1,9 +1,9 @@
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Messages; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.ToServer; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Controller.Rpc; using Phantom.Controller.Rpc;
using Phantom.Controller.Services.Agents; using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events; using Phantom.Controller.Services.Events;
@ -13,8 +13,8 @@ using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services.Rpc; namespace Phantom.Controller.Services.Rpc;
public sealed class MessageToServerListener : IMessageToServerListener { public sealed class AgentMessageListener : IMessageToControllerListener {
private readonly RpcClientConnection connection; private readonly RpcClientConnection<IMessageToAgentListener> connection;
private readonly AgentManager agentManager; private readonly AgentManager agentManager;
private readonly AgentJavaRuntimesManager agentJavaRuntimesManager; private readonly AgentJavaRuntimesManager agentJavaRuntimesManager;
private readonly InstanceManager instanceManager; private readonly InstanceManager instanceManager;
@ -24,9 +24,7 @@ public sealed class MessageToServerListener : IMessageToServerListener {
private readonly TaskCompletionSource<Guid> agentGuidWaiter = AsyncTasks.CreateCompletionSource<Guid>(); private readonly TaskCompletionSource<Guid> agentGuidWaiter = AsyncTasks.CreateCompletionSource<Guid>();
public bool IsDisposed { get; private set; } internal AgentMessageListener(RpcClientConnection<IMessageToAgentListener> connection, AgentManager agentManager, AgentJavaRuntimesManager agentJavaRuntimesManager, InstanceManager instanceManager, InstanceLogManager instanceLogManager, EventLog eventLog, CancellationToken cancellationToken) {
internal MessageToServerListener(RpcClientConnection connection, AgentManager agentManager, AgentJavaRuntimesManager agentJavaRuntimesManager, InstanceManager instanceManager, InstanceLogManager instanceLogManager, EventLog eventLog, CancellationToken cancellationToken) {
this.connection = connection; this.connection = connection;
this.agentManager = agentManager; this.agentManager = agentManager;
this.agentJavaRuntimesManager = agentJavaRuntimesManager; this.agentJavaRuntimesManager = agentJavaRuntimesManager;
@ -53,12 +51,11 @@ public sealed class MessageToServerListener : IMessageToServerListener {
} }
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) { public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
IsDisposed = true;
if (agentManager.UnregisterAgent(message.AgentGuid, connection)) { if (agentManager.UnregisterAgent(message.AgentGuid, connection)) {
instanceManager.SetInstanceStatesForAgent(message.AgentGuid, InstanceStatus.Offline); instanceManager.SetInstanceStatesForAgent(message.AgentGuid, InstanceStatus.Offline);
} }
connection.Close();
return Task.FromResult(NoReply.Instance); return Task.FromResult(NoReply.Instance);
} }

View File

@ -0,0 +1,18 @@
using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Controller.Rpc;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Controller.Services.Rpc;
public sealed class WebMessageListener : IMessageToControllerListener {
private readonly RpcClientConnection<IMessageToWebListener> connection;
internal WebMessageListener(RpcClientConnection<IMessageToWebListener> connection) {
this.connection = connection;
}
public Task<NoReply> HandleReply(ReplyMessage message) {
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -1,5 +1,7 @@
using System.Reflection; using System.Reflection;
using Phantom.Common.Logging; using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Web;
using Phantom.Controller; using Phantom.Controller;
using Phantom.Controller.Database.Postgres; using Phantom.Controller.Database.Postgres;
using Phantom.Controller.Rpc; using Phantom.Controller.Rpc;
@ -55,14 +57,15 @@ try {
await controllerServices.Initialize(); await controllerServices.Initialize();
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc", "Agent"), PhantomLogger.Create<TaskManager>("Rpc"), agentRpcServerHost, agentRpcServerPort, agentKeyData.Certificate); static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
var rpcTask = RpcLauncher.Launch(rpcConfiguration, controllerServices.CreateMessageToServerListener, shutdownCancellationToken); return new RpcConfiguration(PhantomLogger.Create("Rpc", serviceName), PhantomLogger.Create<TaskManager>("Rpc", serviceName), host, port, connectionKey.Certificate);
try {
await rpcTask.WaitAsync(shutdownCancellationToken);
} finally {
await rpcTask;
} }
await Task.WhenAll(
RpcRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
);
return 0; return 0;
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
return 0; return 0;

View File

@ -28,7 +28,9 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Data.Tests",
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Logging", "Common\Phantom.Common.Logging\Phantom.Common.Logging.csproj", "{D7F55010-B3ED-42A5-8D83-E754FFC5F2A2}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Logging", "Common\Phantom.Common.Logging\Phantom.Common.Logging.csproj", "{D7F55010-B3ED-42A5-8D83-E754FFC5F2A2}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Messages", "Common\Phantom.Common.Messages\Phantom.Common.Messages.csproj", "{95B55357-F8F0-48C2-A1C2-5EA997651783}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Messages.Agent", "Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj", "{95B55357-F8F0-48C2-A1C2-5EA997651783}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Messages.Web", "Common\Phantom.Common.Messages.Web\Phantom.Common.Messages.Web.csproj", "{6E798DEB-8921-41A2-8AFB-E4416A9E0704}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Controller", "Controller\Phantom.Controller\Phantom.Controller.csproj", "{A0F1C595-96B6-4DBF-8C16-6B99223F8F35}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Controller", "Controller\Phantom.Controller\Phantom.Controller.csproj", "{A0F1C595-96B6-4DBF-8C16-6B99223F8F35}"
EndProject EndProject
@ -96,6 +98,10 @@ Global
{95B55357-F8F0-48C2-A1C2-5EA997651783}.Debug|Any CPU.Build.0 = Debug|Any CPU {95B55357-F8F0-48C2-A1C2-5EA997651783}.Debug|Any CPU.Build.0 = Debug|Any CPU
{95B55357-F8F0-48C2-A1C2-5EA997651783}.Release|Any CPU.ActiveCfg = Release|Any CPU {95B55357-F8F0-48C2-A1C2-5EA997651783}.Release|Any CPU.ActiveCfg = Release|Any CPU
{95B55357-F8F0-48C2-A1C2-5EA997651783}.Release|Any CPU.Build.0 = Release|Any CPU {95B55357-F8F0-48C2-A1C2-5EA997651783}.Release|Any CPU.Build.0 = Release|Any CPU
{6E798DEB-8921-41A2-8AFB-E4416A9E0704}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6E798DEB-8921-41A2-8AFB-E4416A9E0704}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6E798DEB-8921-41A2-8AFB-E4416A9E0704}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6E798DEB-8921-41A2-8AFB-E4416A9E0704}.Release|Any CPU.Build.0 = Release|Any CPU
{A0F1C595-96B6-4DBF-8C16-6B99223F8F35}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A0F1C595-96B6-4DBF-8C16-6B99223F8F35}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A0F1C595-96B6-4DBF-8C16-6B99223F8F35}.Debug|Any CPU.Build.0 = Debug|Any CPU {A0F1C595-96B6-4DBF-8C16-6B99223F8F35}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A0F1C595-96B6-4DBF-8C16-6B99223F8F35}.Release|Any CPU.ActiveCfg = Release|Any CPU {A0F1C595-96B6-4DBF-8C16-6B99223F8F35}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -157,6 +163,7 @@ Global
{6C3DB1E5-F695-4D70-8F3A-78C2957274BE} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18} {6C3DB1E5-F695-4D70-8F3A-78C2957274BE} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}
{D7F55010-B3ED-42A5-8D83-E754FFC5F2A2} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18} {D7F55010-B3ED-42A5-8D83-E754FFC5F2A2} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}
{95B55357-F8F0-48C2-A1C2-5EA997651783} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18} {95B55357-F8F0-48C2-A1C2-5EA997651783} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}
{6E798DEB-8921-41A2-8AFB-E4416A9E0704} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}
{435D7981-DFDA-46A0-8CD8-CD8C117935D7} = {D781E00D-8563-4102-A0CD-477A679193B5} {435D7981-DFDA-46A0-8CD8-CD8C117935D7} = {D781E00D-8563-4102-A0CD-477A679193B5}
{A0F1C595-96B6-4DBF-8C16-6B99223F8F35} = {0AB9471E-6228-4EB7-802E-3102B3952AAD} {A0F1C595-96B6-4DBF-8C16-6B99223F8F35} = {0AB9471E-6228-4EB7-802E-3102B3952AAD}
{E3AD566F-384A-489A-A3BB-EA3BA400C18C} = {0AB9471E-6228-4EB7-802E-3102B3952AAD} {E3AD566F-384A-489A-A3BB-EA3BA400C18C} = {0AB9471E-6228-4EB7-802E-3102B3952AAD}

View File

@ -93,8 +93,8 @@ Use volumes to persist either the whole `/data` folder, or just `/data/data` if
- `CONTROLLER_HOST` is the hostname of the Controller. - `CONTROLLER_HOST` is the hostname of the Controller.
- `CONTROLLER_PORT` is the Agent RPC port of the Controller. Default: `9401` - `CONTROLLER_PORT` is the Agent RPC port of the Controller. Default: `9401`
- `AGENT_NAME` is the display name of the Agent. Emoji are allowed. - `AGENT_NAME` is the display name of the Agent. Emoji are allowed.
- `AGENT_KEY` is the plaintext-encoded version of [Agent Key](#agent-key). - `AGENT_KEY` is the plaintext-encoded version of [Agent Key](#agent--web-keys).
- `AGENT_KEY_FILE` is a path to the [Agent Key](#agent-key) binary file. - `AGENT_KEY_FILE` is a path to the [Agent Key](#agent--web-keys) binary file.
* **Agent Configuration** * **Agent Configuration**
- `MAX_INSTANCES` is the number of instances that can be created. - `MAX_INSTANCES` is the number of instances that can be created.
- `MAX_MEMORY` is the maximum amount of RAM that can be distributed among all instances. Use a positive integer with an optional suffix 'M' for MB, or 'G' for GB. Examples: `4096M`, `16G` - `MAX_MEMORY` is the maximum amount of RAM that can be distributed among all instances. Use a positive integer with an optional suffix 'M' for MB, or 'G' for GB. Examples: `4096M`, `16G`
@ -114,7 +114,7 @@ Use volumes to persist the whole `/data` folder.
* **Controller Communication** * **Controller Communication**
- `CONTROLLER_HOST` is the hostname of the Controller. - `CONTROLLER_HOST` is the hostname of the Controller.
- `CONTROLLER_PORT` is the Web RPC port of the Controller. - `CONTROLLER_PORT` is the Web RPC port of the Controller. Default: `9402`
- `WEB_KEY` is the plaintext-encoded version of [Web Key](#agent--web-keys). - `WEB_KEY` is the plaintext-encoded version of [Web Key](#agent--web-keys).
- `WEB_KEY_FILE` is a path to the [Web Key](#agent--web-keys) binary file. - `WEB_KEY_FILE` is a path to the [Web Key](#agent--web-keys) binary file.
* **Web Server** * **Web Server**
@ -124,7 +124,7 @@ Use volumes to persist the whole `/data` folder.
## Logging ## Logging
Both the Server and Agent support a `LOG_LEVEL` environment variable to set the minimum log level. Possible values: All services support a `LOG_LEVEL` environment variable to set the minimum log level. Possible values:
* `VERBOSE` * `VERBOSE`
* `DEBUG` * `DEBUG`
@ -146,7 +146,8 @@ The repository includes a [Rider](https://www.jetbrains.com/rider/) projects wit
- Database: `postgres` - Database: `postgres`
2. Install one or more Java versions into the `~/.jdks` folder (`%USERPROFILE%\.jdks` on Windows). 2. Install one or more Java versions into the `~/.jdks` folder (`%USERPROFILE%\.jdks` on Windows).
3. Open the project in [Rider](https://www.jetbrains.com/rider/) and use one of the provided run configurations: 3. Open the project in [Rider](https://www.jetbrains.com/rider/) and use one of the provided run configurations:
- `Server` starts the Server. - `Controller` starts the Controller.
- `Web` starts the Web server.
- `Agent 1`, `Agent 2`, `Agent 3` start one of the Agents. - `Agent 1`, `Agent 2`, `Agent 3` start one of the Agents.
- `Server + Agent` starts the Server and Agent 1. - `Server + Agent` starts the Server and Agent 1.
- `Server + Agent x3` starts the Server and Agent 1, 2, and 3. - `Server + Agent x3` starts the Server and Agent 1, 2, and 3.

View File

@ -0,0 +1,9 @@
namespace Phantom.Utils.Rpc.Message;
public interface IMessageDefinitions<TOutgoingListener, TIncomingListener> {
MessageRegistry<TOutgoingListener> Outgoing { get; }
MessageRegistry<TIncomingListener> Incoming { get; }
bool IsRegistrationMessage(Type messageType);
IMessage<TOutgoingListener, NoReply> CreateReplyMessage(uint sequenceId, byte[] serializedReply);
}

View File

@ -6,16 +6,6 @@ using Serilog;
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc;
static class RpcRuntime { static class RpcRuntime {
private static bool HasRuntime { get; set; }
internal static void MarkRuntimeCreated() {
if (HasRuntime) {
throw new InvalidOperationException("Only one instance of RpcRuntime can be created.");
}
HasRuntime = true;
}
internal static void SetDefaultSocketOptions(ThreadSafeSocketOptions options) { internal static void SetDefaultSocketOptions(ThreadSafeSocketOptions options) {
// TODO test behavior when either agent or server are offline for a very long time // TODO test behavior when either agent or server are offline for a very long time
options.DelayAttachOnConnect = true; options.DelayAttachOnConnect = true;
@ -24,14 +14,13 @@ static class RpcRuntime {
} }
} }
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new() { public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
private readonly TSocket socket; private readonly TSocket socket;
private readonly ILogger runtimeLogger; private readonly ILogger runtimeLogger;
private readonly MessageReplyTracker replyTracker; private readonly MessageReplyTracker replyTracker;
private readonly TaskManager taskManager; private readonly TaskManager taskManager;
protected RpcRuntime(RpcConfiguration configuration, TSocket socket) { protected RpcRuntime(RpcConfiguration configuration, TSocket socket) {
RpcRuntime.MarkRuntimeCreated();
RpcRuntime.SetDefaultSocketOptions(socket.Options); RpcRuntime.SetDefaultSocketOptions(socket.Options);
this.socket = socket; this.socket = socket;
this.runtimeLogger = configuration.RuntimeLogger; this.runtimeLogger = configuration.RuntimeLogger;

View File

@ -4,12 +4,22 @@ using Phantom.Utils.Runtime;
namespace Phantom.Web; namespace Phantom.Web;
sealed record Variables( sealed record Variables(
string ControllerHost,
ushort ControllerPort,
string? WebKeyToken,
string? WebKeyFilePath,
string WebServerHost, string WebServerHost,
ushort WebServerPort, ushort WebServerPort,
string WebBasePath string WebBasePath
) { ) {
private static Variables LoadOrThrow() { private static Variables LoadOrThrow() {
var (webKeyToken, webKeyFilePath) = EnvironmentVariables.GetEitherString("WEB_KEY", "WEB_KEY_FILE").Require;
return new Variables( return new Variables(
EnvironmentVariables.GetString("CONTROLLER_HOST").Require,
EnvironmentVariables.GetPortNumber("CONTROLLER_PORT").WithDefault(9402),
webKeyToken,
webKeyFilePath,
EnvironmentVariables.GetString("WEB_SERVER_HOST").WithDefault("0.0.0.0"), EnvironmentVariables.GetString("WEB_SERVER_HOST").WithDefault("0.0.0.0"),
EnvironmentVariables.GetPortNumber("WEB_SERVER_PORT").WithDefault(9400), EnvironmentVariables.GetPortNumber("WEB_SERVER_PORT").WithDefault(9400),
EnvironmentVariables.GetString("WEB_BASE_PATH").Validate(static value => value.StartsWith('/') && value.EndsWith('/'), "Environment variable must begin and end with '/'").WithDefault("/") EnvironmentVariables.GetString("WEB_BASE_PATH").Validate(static value => value.StartsWith('/') && value.EndsWith('/'), "Environment variable must begin and end with '/'").WithDefault("/")