1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-10-18 15:42:50 +02:00

Compare commits

...

2 Commits

96 changed files with 687 additions and 486 deletions

View File

@ -1,7 +1,7 @@
using System.Diagnostics; using System.Diagnostics;
using Phantom.Common.Data.Java; using Phantom.Common.Data.Java;
using Phantom.Common.Logging;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Agent.Minecraft.Java; namespace Phantom.Agent.Minecraft.Java;

View File

@ -11,8 +11,8 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Logging\Phantom.Common.Logging.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils\Phantom.Utils.csproj" /> <ProjectReference Include="..\..\Utils\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,8 +1,8 @@
using System.Security.Cryptography; using System.Security.Cryptography;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Serilog; using Serilog;

View File

@ -1,7 +1,7 @@
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Logging;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Agent.Minecraft.Server; namespace Phantom.Agent.Minecraft.Server;

View File

@ -3,7 +3,7 @@ using System.Buffers.Binary;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text; using System.Text;
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Agent.Minecraft.Server; namespace Phantom.Agent.Minecraft.Server;

View File

@ -1,6 +1,6 @@
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Utils.Rpc; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog; using Serilog;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Rpc;

View File

@ -1,7 +1,7 @@
using Phantom.Common.Logging; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog; using Serilog;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Rpc;
@ -26,7 +26,7 @@ sealed class KeepAliveLoop {
try { try {
while (true) { while (true) {
await Task.Delay(KeepAliveInterval, cancellationToken); await Task.Delay(KeepAliveInterval, cancellationToken);
await connection.Send(new AgentIsAliveMessage()); await connection.Send(new AgentIsAliveMessage()).WaitAsync(cancellationToken);
} }
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
// Ignore. // Ignore.

View File

@ -1,33 +1,31 @@
using NetMQ; using NetMQ;
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Common.Data.Agent;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Sockets; using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks;
using Serilog; using Serilog;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Rpc;
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> { public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
public static Task Launch(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, AgentInfo agentInfo, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) { public static Task Launch(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch(); return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch();
} }
private RpcClientRuntime(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {} private RpcClientRuntime(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
protected override void RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection, ILogger logger, TaskManager taskManager) { protected override async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection) {
var keepAliveLoop = new KeepAliveLoop(connection); var keepAliveLoop = new KeepAliveLoop(connection);
try { try {
base.RunWithConnection(socket, connection, logger, taskManager); await base.RunWithConnection(socket, connection);
} finally { } finally {
keepAliveLoop.Cancel(); keepAliveLoop.Cancel();
} }
} }
protected override async Task Disconnect(ClientSocket socket, ILogger logger) { protected override async Task SendDisconnectMessage(ClientSocket socket, ILogger logger) {
var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage()).ToArray(); var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage()).ToArray();
try { try {
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None); await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);

View File

@ -1,5 +1,5 @@
using Phantom.Common.Logging; using Phantom.Utils.IO;
using Phantom.Utils.IO; using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Agent.Services; namespace Phantom.Agent.Services;

View File

@ -3,7 +3,7 @@ using Phantom.Agent.Rpc;
using Phantom.Agent.Services.Backups; using Phantom.Agent.Services.Backups;
using Phantom.Agent.Services.Instances; using Phantom.Agent.Services.Instances;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;

View File

@ -3,8 +3,8 @@ using System.Diagnostics.CodeAnalysis;
using System.Formats.Tar; using System.Formats.Tar;
using Phantom.Agent.Minecraft.Instance; using Phantom.Agent.Minecraft.Instance;
using Phantom.Common.Data.Backups; using Phantom.Common.Data.Backups;
using Phantom.Common.Logging;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Agent.Services.Backups; namespace Phantom.Agent.Services.Backups;

View File

@ -1,4 +1,4 @@
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Processes; using Phantom.Utils.Processes;
using Serilog; using Serilog;

View File

@ -1,6 +1,6 @@
using Phantom.Agent.Minecraft.Instance; using Phantom.Agent.Minecraft.Instance;
using Phantom.Common.Data.Backups; using Phantom.Common.Data.Backups;
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Agent.Services.Backups; namespace Phantom.Agent.Services.Backups;

View File

@ -3,7 +3,7 @@ using Phantom.Agent.Minecraft.Server;
using Phantom.Agent.Services.Instances; using Phantom.Agent.Services.Instances;
using Phantom.Agent.Services.Instances.Procedures; using Phantom.Agent.Services.Instances.Procedures;
using Phantom.Common.Data.Backups; using Phantom.Common.Data.Backups;
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Phantom.Utils.Threading; using Phantom.Utils.Threading;

View File

@ -4,8 +4,8 @@ using Phantom.Agent.Services.Instances.States;
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Agent.Services.Instances; namespace Phantom.Agent.Services.Instances;
@ -21,8 +21,7 @@ sealed class Instance : IAsyncDisposable {
private readonly ILogger logger; private readonly ILogger logger;
private IInstanceStatus currentStatus; private IInstanceStatus currentStatus;
private int statusUpdateCounter;
private IInstanceState currentState; private IInstanceState currentState;
public bool IsRunning => currentState is not InstanceNotRunningState; public bool IsRunning => currentState is not InstanceNotRunningState;
@ -38,40 +37,23 @@ sealed class Instance : IAsyncDisposable {
this.Configuration = configuration; this.Configuration = configuration;
this.Launcher = launcher; this.Launcher = launcher;
this.currentState = new InstanceNotRunningState();
this.currentStatus = InstanceStatus.NotRunning; this.currentStatus = InstanceStatus.NotRunning;
this.currentState = new InstanceNotRunningState();
this.procedureManager = new InstanceProcedureManager(this, new Context(this), services.TaskManager); this.procedureManager = new InstanceProcedureManager(this, new Context(this), services.TaskManager);
} }
private void TryUpdateStatus(string taskName, Func<Task> getUpdateTask) {
int myStatusUpdateCounter = Interlocked.Increment(ref statusUpdateCounter);
Services.TaskManager.Run(taskName, async () => {
if (myStatusUpdateCounter == statusUpdateCounter) {
await getUpdateTask();
}
});
}
public void ReportLastStatus() { public void ReportLastStatus() {
TryUpdateStatus("Report last status of instance " + shortName, async () => { Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
await Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
});
} }
private void ReportAndSetStatus(IInstanceStatus status) { private void ReportAndSetStatus(IInstanceStatus status) {
TryUpdateStatus("Report status of instance " + shortName + " as " + status.GetType().Name, async () => { currentStatus = status;
if (status != currentStatus) { Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
currentStatus = status;
await Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
}
});
} }
private void ReportEvent(IInstanceEvent instanceEvent) { private void ReportEvent(IInstanceEvent instanceEvent) {
var message = new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent); Services.ControllerConnection.Send(new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent));
Services.TaskManager.Run("Report event for instance " + shortName, async () => await Services.ControllerConnection.Send(message));
} }
internal void TransitionState(IInstanceState newState) { internal void TransitionState(IInstanceState newState) {

View File

@ -1,8 +1,8 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Threading.Channels; using System.Threading.Channels;
using Phantom.Agent.Rpc; using Phantom.Agent.Rpc;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Logging;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
namespace Phantom.Agent.Services.Instances; namespace Phantom.Agent.Services.Instances;
@ -36,14 +36,14 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
try { try {
while (await lineReader.WaitToReadAsync(CancellationToken)) { while (await lineReader.WaitToReadAsync(CancellationToken)) {
await Task.Delay(SendDelay, CancellationToken); await Task.Delay(SendDelay, CancellationToken);
await SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder)); SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
} }
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
// Ignore. // Ignore.
} }
// Flush remaining lines. // Flush remaining lines.
await SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder)); SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
} }
private ImmutableArray<string> ReadLinesFromChannel(ChannelReader<string> reader, ImmutableArray<string>.Builder builder) { private ImmutableArray<string> ReadLinesFromChannel(ChannelReader<string> reader, ImmutableArray<string>.Builder builder) {
@ -61,9 +61,9 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
return builder.ToImmutable(); return builder.ToImmutable();
} }
private async Task SendOutputToServer(ImmutableArray<string> lines) { private void SendOutputToServer(ImmutableArray<string> lines) {
if (!lines.IsEmpty) { if (!lines.IsEmpty) {
await controllerConnection.Send(new InstanceOutputMessage(instanceGuid, lines)); controllerConnection.Send(new InstanceOutputMessage(instanceGuid, lines));
} }
} }

View File

@ -13,9 +13,9 @@ using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;

View File

@ -1,12 +1,12 @@
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Serilog; using Serilog;
namespace Phantom.Agent.Services.Rpc; namespace Phantom.Agent.Services.Rpc;

View File

@ -1,8 +1,8 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Agent; namespace Phantom.Agent;

View File

@ -1,6 +1,6 @@
using System.Text; using System.Text;
using Phantom.Common.Logging;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Agent; namespace Phantom.Agent;

View File

@ -5,13 +5,12 @@ using Phantom.Agent.Rpc;
using Phantom.Agent.Services; using Phantom.Agent.Services;
using Phantom.Agent.Services.Rpc; using Phantom.Agent.Services.Rpc;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets; using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Phantom.Utils.Tasks;
const int ProtocolVersion = 1; const int ProtocolVersion = 1;
@ -52,7 +51,7 @@ try {
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent..."); PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate); var rpcConfiguration = new RpcConfiguration("Rpc", controllerHost, controllerPort, controllerCertificate);
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo)); var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcSocket.Connection)); var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcSocket.Connection));
@ -60,7 +59,7 @@ try {
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1); var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
var rpcMessageListener = new MessageListener(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource); var rpcMessageListener = new MessageListener(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource);
var rpcTask = RpcClientRuntime.Launch(rpcSocket, agentInfo, rpcMessageListener, rpcDisconnectSemaphore, shutdownCancellationToken); var rpcTask = RpcClientRuntime.Launch(rpcSocket, rpcMessageListener, rpcDisconnectSemaphore, shutdownCancellationToken);
try { try {
await rpcTask.WaitAsync(shutdownCancellationToken); await rpcTask.WaitAsync(shutdownCancellationToken);
} finally { } finally {

View File

@ -1,6 +1,6 @@
using Phantom.Agent.Minecraft.Java; using Phantom.Agent.Minecraft.Java;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
namespace Phantom.Agent; namespace Phantom.Agent;

View File

@ -1,8 +1,8 @@
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Agent; namespace Phantom.Common.Messages.Agent;

View File

@ -8,6 +8,11 @@ public sealed partial record ReplyMessage(
[property: MemoryPackOrder(0)] uint SequenceId, [property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] byte[] SerializedReply [property: MemoryPackOrder(1)] byte[] SerializedReply
) : IMessageToController, IMessageToAgent, IReply { ) : IMessageToController, IMessageToAgent, IReply {
private static readonly MessageQueueKey MessageQueueKey = new ("Reply");
[MemoryPackIgnore]
public MessageQueueKey QueueKey => MessageQueueKey;
public Task<NoReply> Accept(IMessageToControllerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleReply(this); return listener.HandleReply(this);
} }

View File

@ -2,6 +2,11 @@
namespace Phantom.Common.Messages.Agent; namespace Phantom.Common.Messages.Agent;
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {} public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {
MessageQueueKey IMessage<IMessageToAgentListener, TReply>.QueueKey => IMessageToAgent.DefaultQueueKey;
}
public interface IMessageToAgent : IMessageToAgent<NoReply> {} public interface IMessageToAgent : IMessageToAgent<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Agent.Default");
MessageQueueKey IMessage<IMessageToAgentListener, NoReply>.QueueKey => DefaultQueueKey;
}

View File

@ -2,6 +2,11 @@
namespace Phantom.Common.Messages.Agent; namespace Phantom.Common.Messages.Agent;
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {} public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {
MessageQueueKey IMessage<IMessageToControllerListener, TReply>.QueueKey => IMessageToController.DefaultQueueKey;
}
public interface IMessageToController : IMessageToController<NoReply> {} public interface IMessageToController : IMessageToController<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Agent.Default");
MessageQueueKey IMessage<IMessageToControllerListener, NoReply>.QueueKey => DefaultQueueKey;
}

View File

@ -6,8 +6,8 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Phantom.Common.Logging\Phantom.Common.Logging.csproj" />
<ProjectReference Include="..\Phantom.Common.Data\Phantom.Common.Data.csproj" /> <ProjectReference Include="..\Phantom.Common.Data\Phantom.Common.Data.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Rpc\Phantom.Utils.Rpc.csproj" /> <ProjectReference Include="..\..\Utils\Phantom.Utils.Rpc\Phantom.Utils.Rpc.csproj" />
</ItemGroup> </ItemGroup>

View File

@ -9,6 +9,11 @@ public sealed partial record InstanceOutputMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid, [property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines [property: MemoryPackOrder(1)] ImmutableArray<string> Lines
) : IMessageToController { ) : IMessageToController {
private static readonly MessageQueueKey MessageQueueKey = new ("Agent.InstanceOutput");
[MemoryPackIgnore]
public MessageQueueKey QueueKey => MessageQueueKey;
public Task<NoReply> Accept(IMessageToControllerListener listener) { public Task<NoReply> Accept(IMessageToControllerListener listener) {
return listener.HandleInstanceOutput(this); return listener.HandleInstanceOutput(this);
} }

View File

@ -2,6 +2,11 @@
namespace Phantom.Common.Messages.Web; namespace Phantom.Common.Messages.Web;
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {} public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {
MessageQueueKey IMessage<IMessageToControllerListener, TReply>.QueueKey => IMessageToController.DefaultQueueKey;
}
public interface IMessageToController : IMessageToController<NoReply> {} public interface IMessageToController : IMessageToController<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Web.Default");
MessageQueueKey IMessage<IMessageToControllerListener, NoReply>.QueueKey => DefaultQueueKey;
}

View File

@ -2,6 +2,11 @@
namespace Phantom.Common.Messages.Web; namespace Phantom.Common.Messages.Web;
public interface IMessageToWeb<TReply> : IMessage<IMessageToWebListener, TReply> {} public interface IMessageToWeb<TReply> : IMessage<IMessageToWebListener, TReply> {
MessageQueueKey IMessage<IMessageToWebListener, TReply>.QueueKey => IMessageToWeb.DefaultQueueKey;
}
public interface IMessageToWeb : IMessageToWeb<NoReply> {} public interface IMessageToWeb : IMessageToWeb<NoReply> {
internal static readonly MessageQueueKey DefaultQueueKey = new ("Web.Default");
MessageQueueKey IMessage<IMessageToWebListener, NoReply>.QueueKey => DefaultQueueKey;
}

View File

@ -6,9 +6,9 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Phantom.Common.Logging\Phantom.Common.Logging.csproj" />
<ProjectReference Include="..\Phantom.Common.Data\Phantom.Common.Data.csproj" /> <ProjectReference Include="..\Phantom.Common.Data\Phantom.Common.Data.csproj" />
<ProjectReference Include="..\Phantom.Common.Data.Web\Phantom.Common.Data.Web.csproj" /> <ProjectReference Include="..\Phantom.Common.Data.Web\Phantom.Common.Data.Web.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Rpc\Phantom.Utils.Rpc.csproj" /> <ProjectReference Include="..\..\Utils\Phantom.Utils.Rpc\Phantom.Utils.Rpc.csproj" />
</ItemGroup> </ItemGroup>

View File

@ -9,6 +9,11 @@ public sealed partial record InstanceOutputMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid, [property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines [property: MemoryPackOrder(1)] ImmutableArray<string> Lines
) : IMessageToWeb { ) : IMessageToWeb {
private static readonly MessageQueueKey MessageQueueKey = new ("Web.InstanceOutput");
[MemoryPackIgnore]
public MessageQueueKey QueueKey => MessageQueueKey;
public Task<NoReply> Accept(IMessageToWebListener listener) { public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleInstanceOutput(this); return listener.HandleInstanceOutput(this);
} }

View File

@ -6,10 +6,10 @@ using Phantom.Common.Data.Web.AuditLog;
using Phantom.Common.Data.Web.EventLog; using Phantom.Common.Data.Web.EventLog;
using Phantom.Common.Data.Web.Instance; using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Users; using Phantom.Common.Data.Web.Users;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Web.BiDirectional; using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToController; using Phantom.Common.Messages.Web.ToController;
using Phantom.Common.Messages.Web.ToWeb; using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.Web; namespace Phantom.Common.Messages.Web;

View File

@ -1,5 +1,5 @@
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;

View File

@ -17,7 +17,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Data.Web\Phantom.Common.Data.Web.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Data.Web\Phantom.Common.Data.Web.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Logging\Phantom.Common.Logging.csproj" /> <ProjectReference Include="..\..\Utils\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -2,9 +2,9 @@
using System.Net.Http.Json; using System.Net.Http.Json;
using System.Text.Json; using System.Text.Json;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Serilog; using Serilog;

View File

@ -1,7 +1,7 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Diagnostics; using System.Diagnostics;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Controller.Minecraft; namespace Phantom.Controller.Minecraft;

View File

@ -7,8 +7,8 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Logging\Phantom.Common.Logging.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils\Phantom.Utils.csproj" /> <ProjectReference Include="..\..\Utils\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,81 +0,0 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Controller.Rpc;
public sealed class RpcConnectionToClient<TListener> {
private readonly ServerSocket socket;
private readonly uint routingId;
private readonly MessageRegistry<TListener> messageRegistry;
private readonly MessageReplyTracker messageReplyTracker;
private volatile bool isAuthorized;
public bool IsAuthorized {
get => isAuthorized;
set => isAuthorized = value;
}
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
public bool IsClosed { get; private set; }
internal RpcConnectionToClient(ServerSocket socket, uint routingId, MessageRegistry<TListener> messageRegistry, MessageReplyTracker messageReplyTracker) {
this.socket = socket;
this.routingId = routingId;
this.messageRegistry = messageRegistry;
this.messageReplyTracker = messageReplyTracker;
}
public bool IsSame(RpcConnectionToClient<TListener> other) {
return this.routingId == other.routingId && this.socket == other.socket;
}
public void Close() {
bool hasClosed = false;
lock (this) {
if (!IsClosed) {
IsClosed = true;
hasClosed = true;
}
}
if (hasClosed) {
Closed?.Invoke(this, new RpcClientConnectionClosedEventArgs(routingId));
}
}
public async Task Send<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
if (IsClosed) {
return;
}
var bytes = messageRegistry.Write(message).ToArray();
if (bytes.Length > 0) {
await socket.SendAsync(routingId, bytes);
}
}
public async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> where TReply : class {
if (IsClosed) {
return null;
}
var sequenceId = messageReplyTracker.RegisterReply();
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
if (bytes.Length == 0) {
messageReplyTracker.ForgetReply(sequenceId);
return null;
}
await socket.SendAsync(routingId, bytes);
return await messageReplyTracker.TryWaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
}
public void Receive(IReply message) {
messageReplyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
}
}

View File

@ -1,112 +0,0 @@
using NetMQ.Sockets;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks;
using Serilog;
using Serilog.Events;
namespace Phantom.Controller.Rpc;
public static class RpcRuntime {
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
}
}
internal sealed class RpcRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config);
return new RpcRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
}
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly CancellationToken cancellationToken;
private RpcRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory;
this.cancellationToken = cancellationToken;
}
protected override void Run(ServerSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager) {
var clients = new Dictionary<ulong, Client>();
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
clients.Remove(e.RoutingId);
logger.Debug("Closed connection to {RoutingId}.", e.RoutingId);
}
while (!cancellationToken.IsCancellationRequested) {
var (routingId, data) = socket.Receive(cancellationToken);
if (data.Length == 0) {
LogMessageType(logger, routingId, data, messageType: null);
continue;
}
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
if (!clients.TryGetValue(routingId, out var client)) {
if (!CheckIsRegistrationMessage(messageType, logger, routingId)) {
continue;
}
var connection = new RpcConnectionToClient<TClientListener>(socket, routingId, messageDefinitions.ToClient, replyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(connection, messageDefinitions, listenerFactory(connection), logger, taskManager, cancellationToken);
clients[routingId] = client;
}
if (!client.Connection.IsAuthorized && !CheckIsRegistrationMessage(messageType, logger, routingId)) {
continue;
}
LogMessageType(logger, routingId, data, messageType);
messageDefinitions.ToServer.Handle(data, client);
}
foreach (var client in clients.Values) {
client.Connection.Closed -= OnConnectionClosed;
}
}
private void LogMessageType(ILogger logger, uint routingId, ReadOnlyMemory<byte> data, Type? messageType) {
if (!logger.IsEnabled(LogEventLevel.Verbose)) {
return;
}
if (data.Length > 0 && messageType != null) {
logger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", messageType.Name, data.Length, routingId);
}
else {
logger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId);
}
}
private bool CheckIsRegistrationMessage(Type? messageType, ILogger logger, uint routingId) {
if (messageType != null && messageDefinitions.IsRegistrationMessage(messageType)) {
return true;
}
logger.Warning("Received {MessageType} from {RoutingId} who is not registered.", messageType?.Name ?? "unknown message", routingId);
return false;
}
private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; }
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
public Client(RpcConnectionToClient<TClientListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
this.Connection = connection;
this.messageDefinitions = messageDefinitions;
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
}
}
}

View File

@ -1,5 +1,5 @@
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Controller.Rpc; using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Controller.Services.Agents; namespace Phantom.Controller.Services.Agents;
@ -22,7 +22,7 @@ sealed class AgentConnection {
return connection.Send(message); return connection.Send(message);
} }
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class { public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken); return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
} }
} }

View File

@ -2,14 +2,14 @@
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Rpc;
using Phantom.Controller.Services.Instances; using Phantom.Controller.Services.Instances;
using Phantom.Utils.Collections; using Phantom.Utils.Collections;
using Phantom.Utils.Events; using Phantom.Utils.Events;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;
@ -126,12 +126,17 @@ sealed class AgentManager {
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, TMessage message, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class { internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, TMessage message, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null; var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null;
if (connection == null) { if (connection == null || agent == null) {
// TODO handle missing agent? // TODO handle missing agent?
return null; return null;
} }
return await connection.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken); try {
return await connection.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
} catch (Exception e) {
Logger.Error(e, "Could not send message to agent \"{Name}\" (GUID {Guid}).", agent.Name, agent.Guid);
return null;
}
} }
private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> { private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> {

View File

@ -1,15 +1,15 @@
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Minecraft; using Phantom.Controller.Minecraft;
using Phantom.Controller.Rpc;
using Phantom.Controller.Services.Agents; using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events; using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances; using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Rpc; using Phantom.Controller.Services.Rpc;
using Phantom.Controller.Services.Users; using Phantom.Controller.Services.Users;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services; namespace Phantom.Controller.Services;

View File

@ -6,7 +6,6 @@ using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance; using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Minecraft; using Phantom.Common.Data.Web.Minecraft;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database; using Phantom.Controller.Database;
@ -16,6 +15,7 @@ using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents; using Phantom.Controller.Services.Agents;
using Phantom.Utils.Collections; using Phantom.Utils.Collections;
using Phantom.Utils.Events; using Phantom.Utils.Events;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Controller.Services.Instances; namespace Phantom.Controller.Services.Instances;

View File

@ -4,11 +4,11 @@ using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Common.Messages.Agent.ToController; using Phantom.Common.Messages.Agent.ToController;
using Phantom.Controller.Rpc;
using Phantom.Controller.Services.Agents; using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events; using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances; using Phantom.Controller.Services.Instances;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services.Rpc; namespace Phantom.Controller.Services.Rpc;
@ -36,10 +36,11 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) { public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) {
if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.Guid) { if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.Guid) {
connection.SetAuthorizationResult(false);
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent)); await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
} }
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) { else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) {
connection.IsAuthorized = true; connection.SetAuthorizationResult(true);
agentGuidWaiter.SetResult(message.AgentInfo.Guid); agentGuidWaiter.SetResult(message.AgentInfo.Guid);
} }

View File

@ -8,18 +8,18 @@ using Phantom.Common.Data.Web.AuditLog;
using Phantom.Common.Data.Web.EventLog; using Phantom.Common.Data.Web.EventLog;
using Phantom.Common.Data.Web.Instance; using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Users; using Phantom.Common.Data.Web.Users;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional; using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToController; using Phantom.Common.Messages.Web.ToController;
using Phantom.Common.Messages.Web.ToWeb; using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Controller.Minecraft; using Phantom.Controller.Minecraft;
using Phantom.Controller.Rpc;
using Phantom.Controller.Services.Agents; using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events; using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances; using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Users; using Phantom.Controller.Services.Users;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;
@ -108,11 +108,12 @@ public sealed class WebMessageListener : IMessageToControllerListener {
public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) { public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) {
if (authToken.FixedTimeEquals(message.AuthToken)) { if (authToken.FixedTimeEquals(message.AuthToken)) {
Logger.Information("Web authorized successfully."); Logger.Information("Web authorized successfully.");
connection.IsAuthorized = true; connection.SetAuthorizationResult(true);
await connection.Send(new RegisterWebResultMessage(true)); await connection.Send(new RegisterWebResultMessage(true));
} }
else { else {
Logger.Warning("Web failed to authorize, invalid token."); Logger.Warning("Web failed to authorize, invalid token.");
connection.SetAuthorizationResult(false);
await connection.Send(new RegisterWebResultMessage(false)); await connection.Send(new RegisterWebResultMessage(false));
} }

View File

@ -1,10 +1,10 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Phantom.Common.Data.Web.Users; using Phantom.Common.Data.Web.Users;
using Phantom.Common.Logging;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Database.Entities; using Phantom.Controller.Database.Entities;
using Phantom.Utils.Collections; using Phantom.Utils.Collections;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Controller.Services.Users; namespace Phantom.Controller.Services.Users;

View File

@ -1,11 +1,11 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Phantom.Common.Data.Web.Users; using Phantom.Common.Data.Web.Users;
using Phantom.Common.Logging;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Database.Entities; using Phantom.Controller.Database.Entities;
using Phantom.Controller.Database.Repositories; using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Collections; using Phantom.Utils.Collections;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Controller.Services.Users; namespace Phantom.Controller.Services.Users;

View File

@ -1,9 +1,9 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using Phantom.Common.Data.Web.Users; using Phantom.Common.Data.Web.Users;
using Phantom.Common.Logging;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Database.Entities; using Phantom.Controller.Database.Entities;
using Phantom.Controller.Database.Repositories; using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Controller.Services.Users; namespace Phantom.Controller.Services.Users;

View File

@ -1,8 +1,8 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using Phantom.Common.Data.Web.Users; using Phantom.Common.Data.Web.Users;
using Phantom.Common.Logging;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Database.Repositories; using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Controller.Services.Users; namespace Phantom.Controller.Services.Users;

View File

@ -1,8 +1,8 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Controller; namespace Phantom.Controller;

View File

@ -12,7 +12,6 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Logging\Phantom.Common.Logging.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils\Phantom.Utils.csproj" /> <ProjectReference Include="..\..\Utils\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\Phantom.Controller.Database.Postgres\Phantom.Controller.Database.Postgres.csproj" /> <ProjectReference Include="..\Phantom.Controller.Database.Postgres\Phantom.Controller.Database.Postgres.csproj" />
<ProjectReference Include="..\Phantom.Controller.Minecraft\Phantom.Controller.Minecraft.csproj" /> <ProjectReference Include="..\Phantom.Controller.Minecraft\Phantom.Controller.Minecraft.csproj" />

View File

@ -1,14 +1,14 @@
using System.Reflection; using System.Reflection;
using NetMQ; using NetMQ;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Controller; using Phantom.Controller;
using Phantom.Controller.Database.Postgres; using Phantom.Controller.Database.Postgres;
using Phantom.Controller.Rpc;
using Phantom.Controller.Services; using Phantom.Controller.Services;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
@ -59,15 +59,17 @@ try {
await controllerServices.Initialize(); await controllerServices.Initialize();
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) { static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
return new RpcConfiguration(PhantomLogger.Create("Rpc", serviceName), PhantomLogger.Create<TaskManager>("Rpc", serviceName), host, port, connectionKey.Certificate); return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
} }
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
try { try {
await Task.WhenAll( await Task.WhenAll(
RpcRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken), RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken) RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
); );
} finally { } finally {
await rpcTaskManager.Stop();
NetMQConfig.Cleanup(); NetMQConfig.Cleanup();
} }

View File

@ -1,5 +1,5 @@
using Npgsql; using Npgsql;
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
namespace Phantom.Controller; namespace Phantom.Controller;

View File

@ -28,8 +28,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Data.Tests",
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Data.Web", "Common\Phantom.Common.Data.Web\Phantom.Common.Data.Web.csproj", "{BC969D0B-0019-48E0-9FAF-F5CC906AAF09}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Data.Web", "Common\Phantom.Common.Data.Web\Phantom.Common.Data.Web.csproj", "{BC969D0B-0019-48E0-9FAF-F5CC906AAF09}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Logging", "Common\Phantom.Common.Logging\Phantom.Common.Logging.csproj", "{D7F55010-B3ED-42A5-8D83-E754FFC5F2A2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Messages.Agent", "Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj", "{95B55357-F8F0-48C2-A1C2-5EA997651783}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Messages.Agent", "Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj", "{95B55357-F8F0-48C2-A1C2-5EA997651783}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Messages.Web", "Common\Phantom.Common.Messages.Web\Phantom.Common.Messages.Web.csproj", "{6E798DEB-8921-41A2-8AFB-E4416A9E0704}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Common.Messages.Web", "Common\Phantom.Common.Messages.Web\Phantom.Common.Messages.Web.csproj", "{6E798DEB-8921-41A2-8AFB-E4416A9E0704}"
@ -50,6 +48,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils", "Utils\Phan
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Events", "Utils\Phantom.Utils.Events\Phantom.Utils.Events.csproj", "{2E81523B-5DBE-4992-A77B-1679758D0688}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Events", "Utils\Phantom.Utils.Events\Phantom.Utils.Events.csproj", "{2E81523B-5DBE-4992-A77B-1679758D0688}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Logging", "Utils\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj", "{FCA141F5-4F18-47C2-9855-14E326FF1219}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Rpc", "Utils\Phantom.Utils.Rpc\Phantom.Utils.Rpc.csproj", "{BB112660-7A20-45E6-9195-65363B74027F}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Rpc", "Utils\Phantom.Utils.Rpc\Phantom.Utils.Rpc.csproj", "{BB112660-7A20-45E6-9195-65363B74027F}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Tests", "Utils\Phantom.Utils.Tests\Phantom.Utils.Tests.csproj", "{742599E6-2FC2-4B39-85B8-976C98013030}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Tests", "Utils\Phantom.Utils.Tests\Phantom.Utils.Tests.csproj", "{742599E6-2FC2-4B39-85B8-976C98013030}"
@ -96,10 +96,6 @@ Global
{BC969D0B-0019-48E0-9FAF-F5CC906AAF09}.Debug|Any CPU.Build.0 = Debug|Any CPU {BC969D0B-0019-48E0-9FAF-F5CC906AAF09}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BC969D0B-0019-48E0-9FAF-F5CC906AAF09}.Release|Any CPU.ActiveCfg = Release|Any CPU {BC969D0B-0019-48E0-9FAF-F5CC906AAF09}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BC969D0B-0019-48E0-9FAF-F5CC906AAF09}.Release|Any CPU.Build.0 = Release|Any CPU {BC969D0B-0019-48E0-9FAF-F5CC906AAF09}.Release|Any CPU.Build.0 = Release|Any CPU
{D7F55010-B3ED-42A5-8D83-E754FFC5F2A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D7F55010-B3ED-42A5-8D83-E754FFC5F2A2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D7F55010-B3ED-42A5-8D83-E754FFC5F2A2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D7F55010-B3ED-42A5-8D83-E754FFC5F2A2}.Release|Any CPU.Build.0 = Release|Any CPU
{95B55357-F8F0-48C2-A1C2-5EA997651783}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {95B55357-F8F0-48C2-A1C2-5EA997651783}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{95B55357-F8F0-48C2-A1C2-5EA997651783}.Debug|Any CPU.Build.0 = Debug|Any CPU {95B55357-F8F0-48C2-A1C2-5EA997651783}.Debug|Any CPU.Build.0 = Debug|Any CPU
{95B55357-F8F0-48C2-A1C2-5EA997651783}.Release|Any CPU.ActiveCfg = Release|Any CPU {95B55357-F8F0-48C2-A1C2-5EA997651783}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -140,6 +136,10 @@ Global
{2E81523B-5DBE-4992-A77B-1679758D0688}.Debug|Any CPU.Build.0 = Debug|Any CPU {2E81523B-5DBE-4992-A77B-1679758D0688}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2E81523B-5DBE-4992-A77B-1679758D0688}.Release|Any CPU.ActiveCfg = Release|Any CPU {2E81523B-5DBE-4992-A77B-1679758D0688}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2E81523B-5DBE-4992-A77B-1679758D0688}.Release|Any CPU.Build.0 = Release|Any CPU {2E81523B-5DBE-4992-A77B-1679758D0688}.Release|Any CPU.Build.0 = Release|Any CPU
{FCA141F5-4F18-47C2-9855-14E326FF1219}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FCA141F5-4F18-47C2-9855-14E326FF1219}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FCA141F5-4F18-47C2-9855-14E326FF1219}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FCA141F5-4F18-47C2-9855-14E326FF1219}.Release|Any CPU.Build.0 = Release|Any CPU
{BB112660-7A20-45E6-9195-65363B74027F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {BB112660-7A20-45E6-9195-65363B74027F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BB112660-7A20-45E6-9195-65363B74027F}.Debug|Any CPU.Build.0 = Debug|Any CPU {BB112660-7A20-45E6-9195-65363B74027F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BB112660-7A20-45E6-9195-65363B74027F}.Release|Any CPU.ActiveCfg = Release|Any CPU {BB112660-7A20-45E6-9195-65363B74027F}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -167,7 +167,6 @@ Global
{665C7B87-0165-48BC-B6A6-17A3812A70C9} = {F5878792-64C8-4ECF-A075-66341FF97127} {665C7B87-0165-48BC-B6A6-17A3812A70C9} = {F5878792-64C8-4ECF-A075-66341FF97127}
{AEE8B77E-AB07-423F-9981-8CD829ACB834} = {F5878792-64C8-4ECF-A075-66341FF97127} {AEE8B77E-AB07-423F-9981-8CD829ACB834} = {F5878792-64C8-4ECF-A075-66341FF97127}
{6C3DB1E5-F695-4D70-8F3A-78C2957274BE} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18} {6C3DB1E5-F695-4D70-8F3A-78C2957274BE} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}
{D7F55010-B3ED-42A5-8D83-E754FFC5F2A2} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}
{95B55357-F8F0-48C2-A1C2-5EA997651783} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18} {95B55357-F8F0-48C2-A1C2-5EA997651783} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}
{6E798DEB-8921-41A2-8AFB-E4416A9E0704} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18} {6E798DEB-8921-41A2-8AFB-E4416A9E0704} = {01CB1A81-8950-471C-BFDF-F135FDDB2C18}
{435D7981-DFDA-46A0-8CD8-CD8C117935D7} = {D781E00D-8563-4102-A0CD-477A679193B5} {435D7981-DFDA-46A0-8CD8-CD8C117935D7} = {D781E00D-8563-4102-A0CD-477A679193B5}
@ -180,6 +179,7 @@ Global
{90F0F1B1-EB0A-49C9-8DF0-1153A87F77C9} = {0AB9471E-6228-4EB7-802E-3102B3952AAD} {90F0F1B1-EB0A-49C9-8DF0-1153A87F77C9} = {0AB9471E-6228-4EB7-802E-3102B3952AAD}
{384885E2-5113-45C5-9B15-09BDA0911852} = {AA217EB8-E480-456B-BDF3-39419EF2AD85} {384885E2-5113-45C5-9B15-09BDA0911852} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{2E81523B-5DBE-4992-A77B-1679758D0688} = {AA217EB8-E480-456B-BDF3-39419EF2AD85} {2E81523B-5DBE-4992-A77B-1679758D0688} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{FCA141F5-4F18-47C2-9855-14E326FF1219} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{BB112660-7A20-45E6-9195-65363B74027F} = {AA217EB8-E480-456B-BDF3-39419EF2AD85} {BB112660-7A20-45E6-9195-65363B74027F} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{742599E6-2FC2-4B39-85B8-976C98013030} = {7A3C7B26-26A0-49B9-8505-5F8267C10F10} {742599E6-2FC2-4B39-85B8-976C98013030} = {7A3C7B26-26A0-49B9-8505-5F8267C10F10}
{7CA2E5FE-E507-4DC6-930C-E18711A9F856} = {92B26F48-235F-4500-BD55-800F06A0BA39} {7CA2E5FE-E507-4DC6-930C-E18711A9F856} = {92B26F48-235F-4500-BD55-800F06A0BA39}

View File

@ -1,7 +1,7 @@
using System.Diagnostics.CodeAnalysis; using System.Diagnostics.CodeAnalysis;
using Serilog.Events; using Serilog.Events;
namespace Phantom.Common.Logging; namespace Phantom.Utils.Logging;
static class DefaultLogLevel { static class DefaultLogLevel {
private const string ENVIRONMENT_VARIABLE = "LOG_LEVEL"; private const string ENVIRONMENT_VARIABLE = "LOG_LEVEL";

View File

@ -1,6 +1,6 @@
using Serilog; using Serilog;
namespace Phantom.Common.Logging; namespace Phantom.Utils.Logging;
public static class LoggerExtensions { public static class LoggerExtensions {
private static readonly string HeadingPadding = new (' ', 23); private static readonly string HeadingPadding = new (' ', 23);

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings> <ImplicitUsings>enable</ImplicitUsings>

View File

@ -4,7 +4,7 @@ using Serilog.Core;
using Serilog.Events; using Serilog.Events;
using Serilog.Sinks.SystemConsole.Themes; using Serilog.Sinks.SystemConsole.Themes;
namespace Phantom.Common.Logging; namespace Phantom.Utils.Logging;
public static class PhantomLogger { public static class PhantomLogger {
public static Logger Root { get; } = CreateLogger("[{Timestamp:HH:mm:ss} {Level:u}] {Message:lj}{NewLine}{Exception}"); public static Logger Root { get; } = CreateLogger("[{Timestamp:HH:mm:ss} {Level:u}] {Message:lj}{NewLine}{Exception}");

View File

@ -1,5 +1,6 @@
namespace Phantom.Utils.Rpc.Message; namespace Phantom.Utils.Rpc.Message;
public interface IMessage<TListener, TReply> { public interface IMessage<TListener, TReply> {
MessageQueueKey QueueKey { get; }
Task<TReply> Accept(TListener listener); Task<TReply> Accept(TListener listener);
} }

View File

@ -1,38 +1,41 @@
using Phantom.Utils.Tasks; using Phantom.Utils.Logging;
using Serilog; using Serilog;
namespace Phantom.Utils.Rpc.Message; namespace Phantom.Utils.Rpc.Message;
public abstract class MessageHandler<TListener> { abstract class MessageHandler<TListener> {
protected ILogger Logger { get; }
private readonly TListener listener; private readonly TListener listener;
private readonly ILogger logger; private readonly MessageQueues messageQueues;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
protected MessageHandler(TListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) { protected MessageHandler(string loggerName, TListener listener) {
this.Logger = PhantomLogger.Create("MessageHandler", loggerName);
this.listener = listener; this.listener = listener;
this.logger = logger; this.messageQueues = new MessageQueues(loggerName + ":Receive");
this.taskManager = taskManager;
this.cancellationToken = cancellationToken;
} }
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> { internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
cancellationToken.ThrowIfCancellationRequested(); messageQueues.Enqueue(message.QueueKey, () => TryHandle<TMessage, TReply>(sequenceId, message));
taskManager.Run("Handle message " + message.GetType().Name, async () => {
try {
await Handle<TMessage, TReply>(sequenceId, message);
} catch (Exception e) {
logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
}
});
} }
private async Task Handle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> { private async Task TryHandle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
TReply reply = await message.Accept(listener); TReply reply;
try {
reply = await message.Accept(listener);
} catch (Exception e) {
Logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
return;
}
if (reply is not NoReply) { if (reply is not NoReply) {
await SendReply(sequenceId, MessageSerializer.Serialize(reply)); await SendReply(sequenceId, MessageSerializer.Serialize(reply));
} }
} }
protected abstract Task SendReply(uint sequenceId, byte[] serializedReply); protected abstract Task SendReply(uint sequenceId, byte[] serializedReply);
internal Task StopReceiving() {
return messageQueues.Stop();
}
} }

View File

@ -0,0 +1,9 @@
namespace Phantom.Utils.Rpc.Message;
public sealed class MessageQueueKey {
public string Name { get; }
public MessageQueueKey(string name) {
Name = name;
}
}

View File

@ -0,0 +1,53 @@
using Phantom.Utils.Logging;
using Phantom.Utils.Tasks;
using Serilog;
namespace Phantom.Utils.Rpc.Message;
sealed class MessageQueues {
private readonly ILogger logger;
private readonly TaskManager taskManager;
private readonly Dictionary<MessageQueueKey, RpcQueue> queues = new ();
private Task? stopTask;
public MessageQueues(string loggerName) {
this.logger = PhantomLogger.Create<MessageQueues>(loggerName);
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(loggerName));
}
private RpcQueue GetOrCreateQueue(MessageQueueKey key) {
if (!queues.TryGetValue(key, out var queue)) {
queues[key] = queue = new RpcQueue(taskManager, "Message queue for " + key.Name);
}
return queue;
}
public Task Enqueue(MessageQueueKey key, Func<Task> task) {
lock (this) {
return stopTask == null ? GetOrCreateQueue(key).Enqueue(task) : Task.FromException(new OperationCanceledException());
}
}
public Task<T> Enqueue<T>(MessageQueueKey key, Func<Task<T>> task) {
lock (this) {
return stopTask == null ? GetOrCreateQueue(key).Enqueue(task) : Task.FromException<T>(new OperationCanceledException());
}
}
internal Task Stop() {
lock (this) {
if (stopTask == null) {
logger.Debug("Stopping " + queues.Count + " message queue(s)...");
stopTask = Task.WhenAll(queues.Values.Select(static queue => queue.Stop()))
.ContinueWith(_ => logger.Debug("All queues stopped."));
queues.Clear();
}
return stopTask;
}
}
}

View File

@ -36,7 +36,7 @@ public sealed class MessageRegistry<TListener> {
codeToTypeMapping.Add(code, typeof(TMessage)); codeToTypeMapping.Add(code, typeof(TMessage));
} }
public bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) { internal bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
try { try {
var code = MessageSerializer.ReadCode(ref data); var code = MessageSerializer.ReadCode(ref data);
return codeToTypeMapping.TryGetValue(code, out type); return codeToTypeMapping.TryGetValue(code, out type);
@ -78,7 +78,7 @@ public sealed class MessageRegistry<TListener> {
} }
} }
public void Handle(ReadOnlyMemory<byte> data, MessageHandler<TListener> handler) { internal void Handle(ReadOnlyMemory<byte> data, MessageHandler<TListener> handler) {
ushort code; ushort code;
try { try {
code = MessageSerializer.ReadCode(ref data); code = MessageSerializer.ReadCode(ref data);

View File

@ -1,17 +1,18 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using Phantom.Utils.Logging;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Serilog; using Serilog;
namespace Phantom.Utils.Rpc.Message; namespace Phantom.Utils.Rpc.Message;
public sealed class MessageReplyTracker { sealed class MessageReplyTracker {
private readonly ILogger logger; private readonly ILogger logger;
private readonly ConcurrentDictionary<uint, TaskCompletionSource<byte[]>> replyTasks = new (4, 16); private readonly ConcurrentDictionary<uint, TaskCompletionSource<byte[]>> replyTasks = new (4, 16);
private uint lastSequenceId; private uint lastSequenceId;
internal MessageReplyTracker(ILogger logger) { internal MessageReplyTracker(string loggerName) {
this.logger = logger; this.logger = PhantomLogger.Create<MessageReplyTracker>(loggerName);
} }
public uint RegisterReply() { public uint RegisterReply() {
@ -42,14 +43,6 @@ public sealed class MessageReplyTracker {
ForgetReply(sequenceId); ForgetReply(sequenceId);
} }
} }
public async Task<TReply?> TryWaitForReply<TReply>(uint sequenceId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TReply : class {
try {
return await WaitForReply<TReply>(sequenceId, waitForReplyTime, cancellationToken);
} catch (Exception) {
return null;
}
}
public void ForgetReply(uint sequenceId) { public void ForgetReply(uint sequenceId) {
if (replyTasks.TryRemove(sequenceId, out var task)) { if (replyTasks.TryRemove(sequenceId, out var task)) {

View File

@ -13,6 +13,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" /> <ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,8 +1,7 @@
using NetMQ; using NetMQ;
using Serilog;
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc;
public sealed record RpcConfiguration(ILogger RuntimeLogger, ILogger TaskManagerLogger, string Host, ushort Port, NetMQCertificate ServerCertificate) { public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public string TcpUrl => "tcp://" + Host + ":" + Port; public string TcpUrl => "tcp://" + Host + ":" + Port;
} }

View File

@ -1,54 +0,0 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc;
public sealed class RpcConnectionToServer<TListener> {
private readonly ClientSocket socket;
private readonly MessageRegistry<TListener> messageRegistry;
private readonly MessageReplyTracker replyTracker;
internal RpcConnectionToServer(ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) {
this.socket = socket;
this.messageRegistry = messageRegistry;
this.replyTracker = replyTracker;
}
public async Task Send<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
var bytes = messageRegistry.Write(message).ToArray();
if (bytes.Length > 0) {
await socket.SendAsync(bytes);
}
}
public async Task<TReply?> TrySend<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> where TReply : class {
var sequenceId = replyTracker.RegisterReply();
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
if (bytes.Length == 0) {
replyTracker.ForgetReply(sequenceId);
return null;
}
await socket.SendAsync(bytes);
return await replyTracker.TryWaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
}
public async Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
var sequenceId = replyTracker.RegisterReply();
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
if (bytes.Length == 0) {
replyTracker.ForgetReply(sequenceId);
throw new ArgumentException("Could not write message.", nameof(message));
}
await socket.SendAsync(bytes);
return await replyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
}
public void Receive(IReply message) {
replyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
}
}

View File

@ -3,7 +3,7 @@ using NetMQ.Sockets;
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc;
public static class RpcExtensions { static class RpcExtensions {
public static ReadOnlyMemory<byte> Receive(this ClientSocket socket, CancellationToken cancellationToken) { public static ReadOnlyMemory<byte> Receive(this ClientSocket socket, CancellationToken cancellationToken) {
var msg = new Msg(); var msg = new Msg();
msg.InitEmpty(); msg.InitEmpty();

View File

@ -0,0 +1,60 @@
using System.Threading.Channels;
using Phantom.Utils.Tasks;
namespace Phantom.Utils.Rpc;
sealed class RpcQueue {
private readonly Channel<Func<Task>> channel = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions {
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false
});
private readonly Task processingTask;
public RpcQueue(TaskManager taskManager, string taskName) {
this.processingTask = taskManager.Run(taskName, Process);
}
public Task Enqueue(Action action) {
return Enqueue(() => {
action();
return Task.CompletedTask;
});
}
public Task Enqueue(Func<Task> task) {
var completionSource = AsyncTasks.CreateCompletionSource();
if (!channel.Writer.TryWrite(() => task().ContinueWith(t => completionSource.SetResultFrom(t)))) {
completionSource.SetCanceled();
}
return completionSource.Task;
}
public Task<T> Enqueue<T>(Func<Task<T>> task) {
var completionSource = AsyncTasks.CreateCompletionSource<T>();
if (!channel.Writer.TryWrite(() => task().ContinueWith(t => completionSource.SetResultFrom(t)))) {
completionSource.SetCanceled();
}
return completionSource.Task;
}
private async Task Process() {
try {
await foreach (var task in channel.Reader.ReadAllAsync()) {
await task();
}
} catch (OperationCanceledException) {
// Ignore.
}
}
public Task Stop() {
channel.Writer.Complete();
return processingTask;
}
}

View File

@ -1,49 +0,0 @@
using NetMQ;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks;
using Serilog;
namespace Phantom.Utils.Rpc;
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
private readonly TSocket socket;
private readonly ILogger runtimeLogger;
private readonly MessageReplyTracker replyTracker;
private readonly TaskManager taskManager;
protected RpcRuntime(RpcSocket<TSocket> socket) {
this.socket = socket.Socket;
this.runtimeLogger = socket.Config.RuntimeLogger;
this.replyTracker = socket.ReplyTracker;
this.taskManager = new TaskManager(socket.Config.TaskManagerLogger);
}
protected async Task Launch() {
void RunTask() {
try {
Run(socket, runtimeLogger, replyTracker, taskManager);
} catch (Exception e) {
runtimeLogger.Error(e, "Caught exception in RPC thread.");
}
}
try {
await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
} catch (OperationCanceledException) {
// Ignore.
} finally {
await taskManager.Stop();
await Disconnect(socket, runtimeLogger);
socket.Dispose();
runtimeLogger.Information("ZeroMQ runtime stopped.");
}
}
protected abstract void Run(TSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager);
protected virtual Task Disconnect(TSocket socket, ILogger logger) {
return Task.CompletedTask;
}
}

View File

@ -1,6 +1,6 @@
namespace Phantom.Controller.Rpc; namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcClientConnectionClosedEventArgs : EventArgs { sealed class RpcClientConnectionClosedEventArgs : EventArgs {
internal uint RoutingId { get; } internal uint RoutingId { get; }
internal RpcClientConnectionClosedEventArgs(uint routingId) { internal RpcClientConnectionClosedEventArgs(uint routingId) {

View File

@ -1,11 +1,10 @@
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets; using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks;
using Serilog; using Serilog;
using Serilog.Events; using Serilog.Events;
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc.Runtime;
public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
private readonly RpcConnectionToServer<TServerListener> connection; private readonly RpcConnectionToServer<TServerListener> connection;
@ -23,18 +22,18 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
this.receiveCancellationToken = receiveCancellationToken; this.receiveCancellationToken = receiveCancellationToken;
} }
protected sealed override void Run(ClientSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager) { private protected sealed override Task Run(ClientSocket socket) {
RunWithConnection(socket, connection, logger, taskManager); return RunWithConnection(socket, connection);
} }
protected virtual void RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection, ILogger logger, TaskManager taskManager) { protected virtual async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection) {
var handler = new Handler(connection, messageDefinitions, messageListener, logger, taskManager, receiveCancellationToken); var handler = new Handler(LoggerName, connection, messageDefinitions, messageListener);
try { try {
while (!receiveCancellationToken.IsCancellationRequested) { while (!receiveCancellationToken.IsCancellationRequested) {
var data = socket.Receive(receiveCancellationToken); var data = socket.Receive(receiveCancellationToken);
LogMessageType(logger, data); LogMessageType(RuntimeLogger, data);
if (data.Length > 0) { if (data.Length > 0) {
messageDefinitions.ToClient.Handle(data, handler); messageDefinitions.ToClient.Handle(data, handler);
@ -43,11 +42,25 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
// Ignore. // Ignore.
} finally { } finally {
logger.Debug("ZeroMQ client stopped receiving messages."); await handler.StopReceiving();
disconnectSemaphore.Wait(CancellationToken.None); RuntimeLogger.Debug("ZeroMQ client stopped receiving messages.");
await disconnectSemaphore.WaitAsync(CancellationToken.None);
} }
} }
private protected sealed override async Task Disconnect(ClientSocket socket) {
try {
await connection.StopSending().WaitAsync(TimeSpan.FromSeconds(10), CancellationToken.None);
} catch (TimeoutException) {
RuntimeLogger.Error("Timed out waiting for message sending queue.");
}
await SendDisconnectMessage(socket, RuntimeLogger);
}
protected abstract Task SendDisconnectMessage(ClientSocket socket, ILogger logger);
private void LogMessageType(ILogger logger, ReadOnlyMemory<byte> data) { private void LogMessageType(ILogger logger, ReadOnlyMemory<byte> data) {
if (!logger.IsEnabled(LogEventLevel.Verbose)) { if (!logger.IsEnabled(LogEventLevel.Verbose)) {
return; return;
@ -65,7 +78,7 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
private readonly RpcConnectionToServer<TServerListener> connection; private readonly RpcConnectionToServer<TServerListener> connection;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
public Handler(RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TClientListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) { public Handler(string loggerName, RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TClientListener listener) : base(loggerName, listener) {
this.connection = connection; this.connection = connection;
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
} }

View File

@ -0,0 +1,53 @@
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Runtime;
public abstract class RpcConnection<TListener> {
private readonly MessageRegistry<TListener> messageRegistry;
private readonly MessageQueues sendingQueues;
private readonly MessageReplyTracker replyTracker;
internal RpcConnection(string loggerName, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) {
this.messageRegistry = messageRegistry;
this.sendingQueues = new MessageQueues(loggerName + ":Send");
this.replyTracker = replyTracker;
}
private protected abstract ValueTask Send(byte[] bytes);
public Task Send<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
return sendingQueues.Enqueue(message.QueueKey, () => SendTask(message));
}
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
return sendingQueues.Enqueue(message.QueueKey, () => SendTask<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken));
}
private async Task SendTask<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
var bytes = messageRegistry.Write(message).ToArray();
if (bytes.Length > 0) {
await Send(bytes);
}
}
private async Task<TReply> SendTask<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
var sequenceId = replyTracker.RegisterReply();
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
if (bytes.Length == 0) {
replyTracker.ForgetReply(sequenceId);
throw new ArgumentException("Could not write message.", nameof(message));
}
await Send(bytes);
return await replyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
}
public void Receive(IReply message) {
replyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
}
internal Task StopSending() {
return sendingQueues.Stop();
}
}

View File

@ -0,0 +1,51 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcConnectionToClient<TListener> : RpcConnection<TListener> {
private readonly ServerSocket socket;
private readonly uint routingId;
private readonly TaskCompletionSource<bool> authorizationCompletionSource = new ();
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
public bool IsClosed { get; private set; }
internal RpcConnectionToClient(string loggerName, ServerSocket socket, uint routingId, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
this.socket = socket;
this.routingId = routingId;
}
internal Task<bool> GetAuthorization() {
return authorizationCompletionSource.Task;
}
public void SetAuthorizationResult(bool isAuthorized) {
authorizationCompletionSource.SetResult(isAuthorized);
}
public bool IsSame(RpcConnectionToClient<TListener> other) {
return this.routingId == other.routingId && this.socket == other.socket;
}
public void Close() {
bool hasClosed = false;
lock (this) {
if (!IsClosed) {
IsClosed = true;
hasClosed = true;
}
}
if (hasClosed) {
Closed?.Invoke(this, new RpcClientConnectionClosedEventArgs(routingId));
}
}
private protected override ValueTask Send(byte[] bytes) {
return socket.SendAsync(routingId, bytes);
}
}

View File

@ -0,0 +1,17 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcConnectionToServer<TListener> : RpcConnection<TListener> {
private readonly ClientSocket socket;
internal RpcConnectionToServer(string loggerName, ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
this.socket = socket;
}
private protected override ValueTask Send(byte[] bytes) {
return socket.SendAsync(bytes);
}
}

View File

@ -0,0 +1,50 @@
using System.Diagnostics.CodeAnalysis;
using NetMQ;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Serilog;
namespace Phantom.Utils.Rpc.Runtime;
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
private readonly TSocket socket;
private protected string LoggerName { get; }
private protected ILogger RuntimeLogger { get; }
private protected MessageReplyTracker ReplyTracker { get; }
protected RpcRuntime(RpcSocket<TSocket> socket) {
this.socket = socket.Socket;
this.LoggerName = socket.Config.LoggerName;
this.RuntimeLogger = PhantomLogger.Create(LoggerName);
this.ReplyTracker = socket.ReplyTracker;
}
protected async Task Launch() {
[SuppressMessage("ReSharper", "AccessToDisposedClosure")]
async Task RunTask() {
try {
await Run(socket);
} catch (Exception e) {
RuntimeLogger.Error(e, "Caught exception in RPC thread.");
}
}
try {
await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
} catch (OperationCanceledException) {
// Ignore.
} finally {
await Disconnect(socket);
socket.Dispose();
RuntimeLogger.Information("ZeroMQ runtime stopped.");
}
}
private protected abstract Task Run(TSocket socket);
private protected abstract Task Disconnect(TSocket socket);
}

View File

@ -0,0 +1,145 @@
using System.Collections.Concurrent;
using NetMQ.Sockets;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Tasks;
using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime;
public static class RpcServerRuntime {
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
}
}
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config);
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
}
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory;
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
this.cancellationToken = cancellationToken;
}
private protected override Task Run(ServerSocket socket) {
var clients = new ConcurrentDictionary<ulong, Client>();
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
if (!clients.Remove(e.RoutingId, out var client)) {
return;
}
RuntimeLogger.Debug("Closing connection to {RoutingId}.", e.RoutingId);
client.Connection.Closed -= OnConnectionClosed;
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
await client.StopReceiving();
await client.StopProcessing();
await client.Connection.StopSending();
RuntimeLogger.Debug("Closed connection to {RoutingId}.", e.RoutingId);
});
}
while (!cancellationToken.IsCancellationRequested) {
var (routingId, data) = socket.Receive(cancellationToken);
if (data.Length == 0) {
LogMessageType(routingId, data, messageType: null);
continue;
}
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
if (!clients.TryGetValue(routingId, out var client)) {
var clientLoggerName = LoggerName + ":" + routingId;
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection));
clients[routingId] = client;
}
LogMessageType(routingId, data, messageType);
client.Enqueue(messageType, data);
}
foreach (var client in clients.Values) {
client.Connection.Close();
}
return Task.CompletedTask;
}
private protected override Task Disconnect(ServerSocket socket) {
return Task.CompletedTask;
}
private void LogMessageType(uint routingId, ReadOnlyMemory<byte> data, Type? messageType) {
if (!RuntimeLogger.IsEnabled(LogEventLevel.Verbose)) {
return;
}
if (data.Length > 0 && messageType != null) {
RuntimeLogger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", messageType.Name, data.Length, routingId);
}
else {
RuntimeLogger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId);
}
}
private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; }
private readonly RpcQueue processingQueue;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener) : base(loggerName, listener) {
this.Connection = connection;
this.processingQueue = processingQueue;
this.messageDefinitions = messageDefinitions;
}
internal void Enqueue(Type? messageType, ReadOnlyMemory<byte> data) {
if (!Connection.GetAuthorization().IsCompleted && messageType != null && messageDefinitions.IsRegistrationMessage(messageType)) {
processingQueue.Enqueue(() => Handle(data));
}
else {
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data));
}
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, this);
}
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
if (await Connection.GetAuthorization()) {
Handle(data);
}
else {
Logger.Warning("Dropped message after failed registration.");
}
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
}
internal Task StopProcessing() {
return processingQueue.Stop();
}
}
}

View File

@ -1,6 +1,8 @@
using NetMQ; using NetMQ;
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc.Sockets; namespace Phantom.Utils.Rpc.Sockets;
@ -21,20 +23,20 @@ public sealed class RpcClientSocket<TClientListener, TServerListener, TReplyMess
RpcSocket.SetDefaultSocketOptions(options); RpcSocket.SetDefaultSocketOptions(options);
var url = config.TcpUrl; var url = config.TcpUrl;
var logger = config.RuntimeLogger; var logger = PhantomLogger.Create(config.LoggerName);
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url); logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
socket.Connect(url); socket.Connect(url);
logger.Information("ZeroMQ client ready."); logger.Information("ZeroMQ client ready.");
return new RpcClientSocket<TClientListener, TServerListener, TReplyMessage>(socket, config, messageDefinitions); return new RpcClientSocket<TClientListener, TServerListener, TReplyMessage>(socket, config, messageDefinitions);
} }
public RpcConnectionToServer<TServerListener> Connection { get; } public RpcConnectionToServer<TServerListener> Connection { get; }
internal IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions { get; } internal IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions { get; }
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions) : base(socket, config) { private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions) : base(socket, config) {
MessageDefinitions = messageDefinitions; MessageDefinitions = messageDefinitions;
Connection = new RpcConnectionToServer<TServerListener>(socket, messageDefinitions.ToServer, ReplyTracker); Connection = new RpcConnectionToServer<TServerListener>(config.LoggerName, socket, messageDefinitions.ToServer, ReplyTracker);
} }
} }

View File

@ -1,4 +1,5 @@
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Utils.Logging;
namespace Phantom.Utils.Rpc.Sockets; namespace Phantom.Utils.Rpc.Sockets;
@ -12,7 +13,7 @@ public sealed class RpcServerSocket : RpcSocket<ServerSocket> {
RpcSocket.SetDefaultSocketOptions(options); RpcSocket.SetDefaultSocketOptions(options);
var url = config.TcpUrl; var url = config.TcpUrl;
var logger = config.RuntimeLogger; var logger = PhantomLogger.Create(config.LoggerName);
logger.Information("Starting ZeroMQ server on {Url}...", url); logger.Information("Starting ZeroMQ server on {Url}...", url);
socket.Bind(url); socket.Bind(url);

View File

@ -20,6 +20,6 @@ public abstract class RpcSocket<TSocket> where TSocket : ThreadSafeSocket {
protected RpcSocket(TSocket socket, RpcConfiguration config) { protected RpcSocket(TSocket socket, RpcConfiguration config) {
Socket = socket; Socket = socket;
Config = config; Config = config;
ReplyTracker = new MessageReplyTracker(config.RuntimeLogger); ReplyTracker = new MessageReplyTracker(config.LoggerName);
} }
} }

View File

@ -8,4 +8,28 @@ public static class AsyncTasks {
public static TaskCompletionSource<T> CreateCompletionSource<T>() { public static TaskCompletionSource<T> CreateCompletionSource<T>() {
return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously); return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
} }
public static void SetResultFrom(this TaskCompletionSource completionSource, Task task) {
if (task.IsFaulted) {
completionSource.SetException(task.Exception.InnerExceptions);
}
else if (task.IsCanceled) {
completionSource.SetCanceled();
}
else {
completionSource.SetResult();
}
}
public static void SetResultFrom<T>(this TaskCompletionSource<T> completionSource, Task<T> task) {
if (task.IsFaulted) {
completionSource.SetException(task.Exception.InnerExceptions);
}
else if (task.IsCanceled) {
completionSource.SetCanceled();
}
else {
completionSource.SetResult(task.Result);
}
}
} }

View File

@ -1,7 +1,7 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using Phantom.Common.Data.Web.Agent; using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Logging;
using Phantom.Utils.Events; using Phantom.Utils.Events;
using Phantom.Utils.Logging;
namespace Phantom.Web.Services.Agents; namespace Phantom.Web.Services.Agents;

View File

@ -1,6 +1,6 @@
using Phantom.Common.Data.Web.Users; using Phantom.Common.Data.Web.Users;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Web.ToController; using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Logging;
using Phantom.Web.Services.Rpc; using Phantom.Web.Services.Rpc;
using ILogger = Serilog.ILogger; using ILogger = Serilog.ILogger;

View File

@ -1,7 +1,7 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Security.Cryptography; using System.Security.Cryptography;
using Microsoft.AspNetCore.Components.Server.ProtectedBrowserStorage; using Microsoft.AspNetCore.Components.Server.ProtectedBrowserStorage;
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using ILogger = Serilog.ILogger; using ILogger = Serilog.ILogger;
namespace Phantom.Web.Services.Authentication; namespace Phantom.Web.Services.Authentication;

View File

@ -1,8 +1,8 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Immutable; using System.Collections.Immutable;
using Phantom.Common.Logging;
using Phantom.Utils.Collections; using Phantom.Utils.Collections;
using Phantom.Utils.Events; using Phantom.Utils.Events;
using Phantom.Utils.Logging;
using ILogger = Serilog.ILogger; using ILogger = Serilog.ILogger;
namespace Phantom.Web.Services.Instances; namespace Phantom.Web.Services.Instances;

View File

@ -3,9 +3,9 @@ using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance; using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Web.ToController; using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Events; using Phantom.Utils.Events;
using Phantom.Utils.Logging;
using Phantom.Web.Services.Rpc; using Phantom.Web.Services.Rpc;
namespace Phantom.Web.Services.Instances; namespace Phantom.Web.Services.Instances;

View File

@ -12,9 +12,9 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Data.Web\Phantom.Common.Data.Web.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Data.Web\Phantom.Common.Data.Web.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Logging\Phantom.Common.Logging.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Messages.Web\Phantom.Common.Messages.Web.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Messages.Web\Phantom.Common.Messages.Web.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Events\Phantom.Utils.Events.csproj" /> <ProjectReference Include="..\..\Utils\Phantom.Utils.Events\Phantom.Utils.Events.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -1,5 +1,5 @@
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Web.Services.Rpc; namespace Phantom.Web.Services.Rpc;

View File

@ -1,8 +1,8 @@
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional; using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToWeb; using Phantom.Common.Messages.Web.ToWeb;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Phantom.Web.Services.Agents; using Phantom.Web.Services.Agents;
using Phantom.Web.Services.Instances; using Phantom.Web.Services.Instances;

View File

@ -3,7 +3,7 @@ using NetMQ.Sockets;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.BiDirectional; using Phantom.Common.Messages.Web.BiDirectional;
using Phantom.Common.Messages.Web.ToController; using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Rpc.Sockets; using Phantom.Utils.Rpc.Sockets;
using ILogger = Serilog.ILogger; using ILogger = Serilog.ILogger;
@ -15,8 +15,8 @@ public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToWebListener, I
} }
private RpcClientRuntime(RpcClientSocket<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToWebListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {} private RpcClientRuntime(RpcClientSocket<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToWebListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
protected override async Task Disconnect(ClientSocket socket, ILogger logger) { protected override async Task SendDisconnectMessage(ClientSocket socket, ILogger logger) {
var unregisterMessageBytes = WebMessageRegistries.ToController.Write(new UnregisterWebMessage()).ToArray(); var unregisterMessageBytes = WebMessageRegistries.ToController.Write(new UnregisterWebMessage()).ToArray();
try { try {
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None); await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);

View File

@ -1,7 +1,7 @@
using Microsoft.AspNetCore.Components; using Microsoft.AspNetCore.Components;
using Microsoft.AspNetCore.Components.Authorization; using Microsoft.AspNetCore.Components.Authorization;
using Phantom.Common.Data.Web.Users; using Phantom.Common.Data.Web.Users;
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Phantom.Web.Services.Authorization; using Phantom.Web.Services.Authorization;
using ILogger = Serilog.ILogger; using ILogger = Serilog.ILogger;
using UserInfo = Phantom.Web.Services.Authentication.UserInfo; using UserInfo = Phantom.Web.Services.Authentication.UserInfo;

View File

@ -1,10 +1,10 @@
using System.Reflection; using System.Reflection;
using NetMQ; using NetMQ;
using Phantom.Common.Logging;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Common.Messages.Web.ToController; using Phantom.Common.Messages.Web.ToController;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Sockets; using Phantom.Utils.Rpc.Sockets;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
@ -48,7 +48,7 @@ try {
var (controllerCertificate, webToken) = webKey.Value; var (controllerCertificate, webToken) = webKey.Value;
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate); var rpcConfiguration = new RpcConfiguration("Rpc", controllerHost, controllerPort, controllerCertificate);
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken)); var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken));
var configuration = new Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken); var configuration = new Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);

View File

@ -1,4 +1,4 @@
using Phantom.Common.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Runtime; using Phantom.Utils.Runtime;
namespace Phantom.Web; namespace Phantom.Web;

View File

@ -1,8 +1,8 @@
using NetMQ; using NetMQ;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Logging;
using Phantom.Utils.Cryptography; using Phantom.Utils.Cryptography;
using Phantom.Utils.IO; using Phantom.Utils.IO;
using Phantom.Utils.Logging;
using ILogger = Serilog.ILogger; using ILogger = Serilog.ILogger;
namespace Phantom.Web; namespace Phantom.Web;

View File

@ -1,6 +1,6 @@
using Microsoft.AspNetCore.DataProtection; using Microsoft.AspNetCore.DataProtection;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Utils.Rpc; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
using Phantom.Web.Base; using Phantom.Web.Base;
using Phantom.Web.Services; using Phantom.Web.Services;