1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-11-25 16:42:54 +01:00

Compare commits

...

2 Commits

Author SHA1 Message Date
2cc7975193
Rework message replies 2022-12-30 03:26:06 +01:00
6472134f9a
Update all NuGet packages 2022-12-29 18:20:24 +01:00
56 changed files with 562 additions and 427 deletions

View File

@ -1,5 +1,4 @@
using NetMQ.Sockets;
using Phantom.Common.Logging;
using Phantom.Common.Logging;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Runtime;
using Serilog;
@ -11,11 +10,11 @@ sealed class KeepAliveLoop {
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromSeconds(10);
private readonly ClientSocket socket;
private readonly RpcServerConnection connection;
private readonly CancellationTokenSource cancellationTokenSource = new ();
public KeepAliveLoop(ClientSocket socket, TaskManager taskManager) {
this.socket = socket;
public KeepAliveLoop(RpcServerConnection connection, TaskManager taskManager) {
this.connection = connection;
taskManager.Run(Run);
}
@ -26,7 +25,7 @@ sealed class KeepAliveLoop {
try {
while (true) {
await Task.Delay(KeepAliveInterval, cancellationToken);
await socket.SendMessage(new AgentIsAliveMessage());
await connection.Send(new AgentIsAliveMessage());
}
} catch (OperationCanceledException) {
// Ignore.

View File

@ -1,19 +0,0 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Common.Messages;
using Phantom.Common.Messages.ToServer;
namespace Phantom.Agent.Rpc;
public static class RpcExtensions {
internal static async Task SendMessage<TMessage>(this ClientSocket socket, TMessage message) where TMessage : IMessageToServer {
byte[] bytes = MessageRegistries.ToServer.Write(message).ToArray();
if (bytes.Length > 0) {
await socket.SendAsync(bytes);
}
}
public static Task SendSimpleReply<TMessage, TReplyEnum>(this ClientSocket socket, TMessage message, TReplyEnum reply) where TMessage : IMessageWithReply where TReplyEnum : Enum {
return SendMessage(socket, SimpleReplyMessage.FromEnum(message.SequenceId, reply));
}
}

View File

@ -4,6 +4,7 @@ using Phantom.Common.Data.Agent;
using Phantom.Common.Messages;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Runtime;
using Serilog;
using Serilog.Events;
@ -11,7 +12,7 @@ using Serilog.Events;
namespace Phantom.Agent.Rpc;
public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
public static async Task Launch(RpcConfiguration config, AgentAuthToken authToken, AgentInfo agentInfo, Func<ClientSocket, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
public static async Task Launch(RpcConfiguration config, AgentAuthToken authToken, AgentInfo agentInfo, Func<RpcServerConnection, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
var socket = new ClientSocket();
var options = socket.Options;
@ -24,12 +25,12 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
private readonly RpcConfiguration config;
private readonly Guid agentGuid;
private readonly Func<ClientSocket, IMessageToAgentListener> messageListenerFactory;
private readonly Func<RpcServerConnection, IMessageToAgentListener> messageListenerFactory;
private readonly SemaphoreSlim disconnectSemaphore;
private readonly CancellationToken receiveCancellationToken;
private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<ClientSocket, IMessageToAgentListener> messageListenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, config.Logger) {
private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<RpcServerConnection, IMessageToAgentListener> messageListenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, config.Logger) {
this.config = config;
this.agentGuid = agentGuid;
this.messageListenerFactory = messageListenerFactory;
@ -46,12 +47,13 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
logger.Information("ZeroMQ client ready.");
}
protected override void Run(ClientSocket socket, TaskManager taskManager) {
var logger = config.Logger;
var listener = messageListenerFactory(socket);
protected override void Run(ClientSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
var connection = new RpcServerConnection(socket, replyTracker);
ServerMessaging.SetCurrentConnection(connection);
ServerMessaging.SetCurrentSocket(socket);
var keepAliveLoop = new KeepAliveLoop(socket, taskManager);
var logger = config.Logger;
var handler = new MessageToAgentHandler(messageListenerFactory(connection), logger, taskManager, receiveCancellationToken);
var keepAliveLoop = new KeepAliveLoop(connection, taskManager);
try {
while (!receiveCancellationToken.IsCancellationRequested) {
@ -60,7 +62,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
LogMessageType(logger, data);
if (data.Length > 0) {
MessageRegistries.ToAgent.Handle(data, listener, taskManager, receiveCancellationToken);
MessageRegistries.ToAgent.Handle(data, handler);
}
}
} catch (OperationCanceledException) {
@ -86,11 +88,19 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
}
}
protected override async Task Disconnect(ClientSocket socket) {
protected override async Task Disconnect() {
var unregisterTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), CancellationToken.None);
var finishedTask = await Task.WhenAny(socket.SendMessage(new UnregisterAgentMessage(agentGuid)), unregisterTimeoutTask);
var finishedTask = await Task.WhenAny(ServerMessaging.Send(new UnregisterAgentMessage(agentGuid)), unregisterTimeoutTask);
if (finishedTask == unregisterTimeoutTask) {
config.Logger.Error("Timed out communicating agent shutdown with the server.");
}
}
private sealed class MessageToAgentHandler : MessageHandler<IMessageToAgentListener> {
public MessageToAgentHandler(IMessageToAgentListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return ServerMessaging.Send(new ReplyMessage(sequenceId, serializedReply));
}
}
}

View File

@ -0,0 +1,46 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Common.Messages;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Agent.Rpc;
public sealed class RpcServerConnection {
private readonly ClientSocket socket;
private readonly MessageReplyTracker replyTracker;
internal RpcServerConnection(ClientSocket socket, MessageReplyTracker replyTracker) {
this.socket = socket;
this.replyTracker = replyTracker;
}
private byte[] WriteBytes<TMessage, TReply>(TMessage message) where TMessage : IMessageToServer<TReply> {
return MessageRegistries.ToServer.Write<TMessage, TReply>(message).ToArray();
}
internal async Task Send<TMessage>(TMessage message) where TMessage : IMessageToServer {
var bytes = WriteBytes<TMessage, NoReply>(message);
if (bytes.Length > 0) {
await socket.SendAsync(bytes);
}
}
internal async Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
var sequenceId = replyTracker.RegisterReply();
var message = messageFactory(sequenceId);
var bytes = WriteBytes<TMessage, TReply>(message);
if (bytes.Length == 0) {
replyTracker.ForgetReply(sequenceId);
return null;
}
await socket.SendAsync(bytes);
return await replyTracker.WaitForReply<TReply>(message.SequenceId, waitForReplyTime, cancellationToken);
}
public void Receive(ReplyMessage message) {
replyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
}
}

View File

@ -1,5 +1,4 @@
using NetMQ.Sockets;
using Phantom.Common.Logging;
using Phantom.Common.Logging;
using Phantom.Common.Messages;
using Serilog;
@ -8,23 +7,28 @@ namespace Phantom.Agent.Rpc;
public static class ServerMessaging {
private static readonly ILogger Logger = PhantomLogger.Create(typeof(ServerMessaging));
private static ClientSocket? CurrentSocket { get; set; }
private static readonly object SetCurrentSocketLock = new ();
private static RpcServerConnection? CurrentConnection { get; set; }
private static RpcServerConnection CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready.");
internal static void SetCurrentSocket(ClientSocket socket) {
lock (SetCurrentSocketLock) {
if (CurrentSocket != null) {
throw new InvalidOperationException("Server socket can only be set once.");
private static readonly object SetCurrentConnectionLock = new ();
internal static void SetCurrentConnection(RpcServerConnection connection) {
lock (SetCurrentConnectionLock) {
if (CurrentConnection != null) {
throw new InvalidOperationException("Server connection can only be set once.");
}
CurrentSocket = socket;
CurrentConnection = connection;
}
Logger.Information("Server socket ready.");
Logger.Information("Server connection ready.");
}
public static async Task SendMessage<TMessage>(TMessage message) where TMessage : IMessageToServer {
var currentSocket = CurrentSocket ?? throw new InvalidOperationException("Server socket not ready.");
await currentSocket.SendMessage(message);
public static Task Send<TMessage>(TMessage message) where TMessage : IMessageToServer {
return CurrentConnectionOrThrow.Send(message);
}
public static Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
return CurrentConnectionOrThrow.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
}
}

View File

@ -51,7 +51,7 @@ sealed class Instance : IDisposable {
}
private async Task ReportLastStatus() {
await ServerMessaging.SendMessage(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
}
private void TransitionState(IInstanceState newState) {
@ -143,7 +143,7 @@ sealed class Instance : IDisposable {
instance.launchServices.TaskManager.Run(async () => {
if (myStatusUpdateCounter == statusUpdateCounter) {
instance.currentStatus = newStatus;
await ServerMessaging.SendMessage(new ReportInstanceStatusMessage(Configuration.InstanceGuid, newStatus));
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, newStatus));
}
});
}

View File

@ -52,7 +52,7 @@ sealed class InstanceLogSender {
private async Task SendOutputToServer(ImmutableArray<string> lines) {
if (!lines.IsEmpty) {
await ServerMessaging.SendMessage(new InstanceOutputMessage(instanceGuid, lines));
await ServerMessaging.Send(new InstanceOutputMessage(instanceGuid, lines));
}
}

View File

@ -1,4 +1,5 @@
using Phantom.Agent.Minecraft.Instance;
using System.Diagnostics.CodeAnalysis;
using Phantom.Agent.Minecraft.Instance;
using Phantom.Agent.Minecraft.Java;
using Phantom.Agent.Minecraft.Launcher;
using Phantom.Agent.Minecraft.Launcher.Types;
@ -40,11 +41,31 @@ sealed class InstanceSessionManager : IDisposable {
this.shutdownCancellationToken = shutdownCancellationTokenSource.Token;
}
public async Task<ConfigureInstanceResult> Configure(InstanceConfiguration configuration) {
[SuppressMessage("ReSharper", "ConvertIfStatementToReturnStatement")]
private async Task<InstanceActionResult<T>> AcquireSemaphoreAndRunWithInstance<T>(Guid instanceGuid, Func<Instance, Task<T>> func) {
try {
await semaphore.WaitAsync(shutdownCancellationToken);
} catch (OperationCanceledException) {
return ConfigureInstanceResult.AgentShuttingDown;
return InstanceActionResult.General<T>(InstanceActionGeneralResult.AgentShuttingDown);
}
try {
if (!instances.TryGetValue(instanceGuid, out var instance)) {
return InstanceActionResult.General<T>(InstanceActionGeneralResult.InstanceDoesNotExist);
}
else {
return InstanceActionResult.Concrete(await func(instance));
}
} finally {
semaphore.Release();
}
}
public async Task<InstanceActionResult<ConfigureInstanceResult>> Configure(InstanceConfiguration configuration) {
try {
await semaphore.WaitAsync(shutdownCancellationToken);
} catch (OperationCanceledException) {
return InstanceActionResult.General<ConfigureInstanceResult>(InstanceActionGeneralResult.AgentShuttingDown);
}
var instanceGuid = configuration.InstanceGuid;
@ -52,12 +73,12 @@ sealed class InstanceSessionManager : IDisposable {
try {
var otherInstances = instances.Values.Where(inst => inst.Configuration.InstanceGuid != instanceGuid).ToArray();
if (otherInstances.Length + 1 > agentInfo.MaxInstances) {
return ConfigureInstanceResult.InstanceLimitExceeded;
return InstanceActionResult.Concrete(ConfigureInstanceResult.InstanceLimitExceeded);
}
var availableMemory = agentInfo.MaxMemory - otherInstances.Aggregate(RamAllocationUnits.Zero, static (total, instance) => total + instance.Configuration.MemoryAllocation);
if (availableMemory < configuration.MemoryAllocation) {
return ConfigureInstanceResult.MemoryLimitExceeded;
return InstanceActionResult.Concrete(ConfigureInstanceResult.MemoryLimitExceeded);
}
var heapMegabytes = configuration.MemoryAllocation.InMegabytes;
@ -93,70 +114,22 @@ sealed class InstanceSessionManager : IDisposable {
await instance.Launch(shutdownCancellationToken);
}
return ConfigureInstanceResult.Success;
return InstanceActionResult.Concrete(ConfigureInstanceResult.Success);
} finally {
semaphore.Release();
}
}
public async Task<LaunchInstanceResult> Launch(Guid instanceGuid) {
try {
await semaphore.WaitAsync(shutdownCancellationToken);
} catch (OperationCanceledException) {
return LaunchInstanceResult.AgentShuttingDown;
public Task<InstanceActionResult<LaunchInstanceResult>> Launch(Guid instanceGuid) {
return AcquireSemaphoreAndRunWithInstance(instanceGuid, instance => instance.Launch(shutdownCancellationToken));
}
try {
if (!instances.TryGetValue(instanceGuid, out var instance)) {
return LaunchInstanceResult.InstanceDoesNotExist;
}
else {
return await instance.Launch(shutdownCancellationToken);
}
} finally {
semaphore.Release();
}
public Task<InstanceActionResult<StopInstanceResult>> Stop(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
return AcquireSemaphoreAndRunWithInstance(instanceGuid, instance => instance.Stop(stopStrategy));
}
public async Task<StopInstanceResult> Stop(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
try {
await semaphore.WaitAsync(shutdownCancellationToken);
} catch (OperationCanceledException) {
return StopInstanceResult.AgentShuttingDown;
}
try {
if (!instances.TryGetValue(instanceGuid, out var instance)) {
return StopInstanceResult.InstanceDoesNotExist;
}
else {
return await instance.Stop(stopStrategy);
}
} finally {
semaphore.Release();
}
}
public async Task<SendCommandToInstanceResult> SendCommand(Guid instanceGuid, string command) {
try {
await semaphore.WaitAsync(shutdownCancellationToken);
} catch (OperationCanceledException) {
return SendCommandToInstanceResult.AgentShuttingDown;
}
try {
if (!instances.TryGetValue(instanceGuid, out var instance)) {
return SendCommandToInstanceResult.InstanceDoesNotExist;
}
if (!await instance.SendCommand(command, shutdownCancellationToken)) {
return SendCommandToInstanceResult.UnknownError;
}
return SendCommandToInstanceResult.Success;
} finally {
semaphore.Release();
}
public Task<InstanceActionResult<SendCommandToInstanceResult>> SendCommand(Guid instanceGuid, string command) {
return AcquireSemaphoreAndRunWithInstance(instanceGuid, async instance => await instance.SendCommand(command, shutdownCancellationToken) ? SendCommandToInstanceResult.Success : SendCommandToInstanceResult.UnknownError);
}
public async Task StopAll() {

View File

@ -1,10 +1,10 @@
using NetMQ.Sockets;
using Phantom.Agent.Rpc;
using Phantom.Agent.Rpc;
using Phantom.Common.Data.Replies;
using Phantom.Common.Logging;
using Phantom.Common.Messages;
using Phantom.Common.Messages.ToAgent;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc.Message;
using Serilog;
namespace Phantom.Agent.Services.Rpc;
@ -12,32 +12,34 @@ namespace Phantom.Agent.Services.Rpc;
public sealed class MessageListener : IMessageToAgentListener {
private static ILogger Logger { get; } = PhantomLogger.Create<MessageListener>();
private readonly ClientSocket socket;
private readonly RpcServerConnection connection;
private readonly AgentServices agent;
private readonly CancellationTokenSource shutdownTokenSource;
public MessageListener(ClientSocket socket, AgentServices agent, CancellationTokenSource shutdownTokenSource) {
this.socket = socket;
public MessageListener(RpcServerConnection connection, AgentServices agent, CancellationTokenSource shutdownTokenSource) {
this.connection = connection;
this.agent = agent;
this.shutdownTokenSource = shutdownTokenSource;
}
public async Task HandleRegisterAgentSuccessResult(RegisterAgentSuccessMessage message) {
public async Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
Logger.Information("Agent authentication successful.");
foreach (var instanceInfo in message.InitialInstances) {
if (await agent.InstanceSessionManager.Configure(instanceInfo) != ConfigureInstanceResult.Success) {
var result = await agent.InstanceSessionManager.Configure(instanceInfo);
if (!result.Is(ConfigureInstanceResult.Success)) {
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", instanceInfo.InstanceName, instanceInfo.InstanceGuid);
shutdownTokenSource.Cancel();
return;
return NoReply.Instance;
}
}
await ServerMessaging.SendMessage(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
await ServerMessaging.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
return NoReply.Instance;
}
public Task HandleRegisterAgentFailureResult(RegisterAgentFailureMessage message) {
public Task<NoReply> HandleRegisterAgentFailure(RegisterAgentFailureMessage message) {
string errorMessage = message.FailureKind switch {
RegisterAgentFailure.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
RegisterAgentFailure.InvalidToken => "Invalid token.",
@ -47,22 +49,27 @@ public sealed class MessageListener : IMessageToAgentListener {
Logger.Fatal("Agent authentication failed: {Error}", errorMessage);
Environment.Exit(1);
return Task.CompletedTask;
return Task.FromResult(NoReply.Instance);
}
public async Task HandleConfigureInstance(ConfigureInstanceMessage message) {
await socket.SendSimpleReply(message, await agent.InstanceSessionManager.Configure(message.Configuration));
public async Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message) {
return await agent.InstanceSessionManager.Configure(message.Configuration);
}
public async Task HandleLaunchInstance(LaunchInstanceMessage message) {
await socket.SendSimpleReply(message, await agent.InstanceSessionManager.Launch(message.InstanceGuid));
public async Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
return await agent.InstanceSessionManager.Launch(message.InstanceGuid);
}
public async Task HandleStopInstance(StopInstanceMessage message) {
await socket.SendSimpleReply(message, await agent.InstanceSessionManager.Stop(message.InstanceGuid, message.StopStrategy));
public async Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
return await agent.InstanceSessionManager.Stop(message.InstanceGuid, message.StopStrategy);
}
public async Task HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
await socket.SendSimpleReply(message, await agent.InstanceSessionManager.SendCommand(message.InstanceGuid, message.Command));
public async Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return await agent.InstanceSessionManager.SendCommand(message.InstanceGuid, message.Command);
}
public Task<NoReply> HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -1,5 +1,4 @@
using System.Reflection;
using NetMQ.Sockets;
using Phantom.Agent;
using Phantom.Agent.Rpc;
using Phantom.Agent.Services;
@ -45,8 +44,8 @@ try {
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
var agentServices = new AgentServices(agentInfo, folders);
MessageListener MessageListenerFactory(ClientSocket socket) {
return new MessageListener(socket, agentServices, shutdownCancellationTokenSource);
MessageListener MessageListenerFactory(RpcServerConnection connection) {
return new MessageListener(connection, agentServices, shutdownCancellationTokenSource);
}
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");

View File

@ -2,7 +2,6 @@
public enum ConfigureInstanceResult {
Success,
AgentShuttingDown,
InstanceLimitExceeded,
MemoryLimitExceeded
}

View File

@ -0,0 +1,8 @@
namespace Phantom.Common.Data.Replies;
public enum InstanceActionGeneralResult : byte {
None,
AgentShuttingDown,
AgentIsNotResponding,
InstanceDoesNotExist
}

View File

@ -0,0 +1,41 @@
using MemoryPack;
namespace Phantom.Common.Data.Replies;
[MemoryPackable]
public sealed partial record InstanceActionResult<T>(
[property: MemoryPackOrder(0)] InstanceActionGeneralResult GeneralResult,
[property: MemoryPackOrder(1)] T? ConcreteResult
) {
public bool Is(T? concreteResult) {
return GeneralResult == InstanceActionGeneralResult.None && EqualityComparer<T>.Default.Equals(ConcreteResult, concreteResult);
}
public InstanceActionResult<T2> Map<T2>(Func<T, T2> mapper) {
return new InstanceActionResult<T2>(GeneralResult, ConcreteResult is not null ? mapper(ConcreteResult) : default);
}
public string ToSentence(Func<T, string> concreteResultToSentence) {
return GeneralResult switch {
InstanceActionGeneralResult.None => concreteResultToSentence(ConcreteResult!),
InstanceActionGeneralResult.AgentShuttingDown => "Agent is shutting down.",
InstanceActionGeneralResult.AgentIsNotResponding => "Agent is not responding.",
InstanceActionGeneralResult.InstanceDoesNotExist => "Instance does not exist.",
_ => "Unknown result."
};
}
}
public static class InstanceActionResult {
public static InstanceActionResult<T> General<T>(InstanceActionGeneralResult generalResult) {
return new InstanceActionResult<T>(generalResult, default);
}
public static InstanceActionResult<T> Concrete<T>(T? concreteResult) {
return new InstanceActionResult<T>(InstanceActionGeneralResult.None, concreteResult);
}
public static InstanceActionResult<T> DidNotReplyIfNull<T>(this InstanceActionResult<T>? result) {
return result ?? General<T>(InstanceActionGeneralResult.AgentIsNotResponding);
}
}

View File

@ -2,12 +2,9 @@
public enum LaunchInstanceResult {
LaunchInitiated,
AgentShuttingDown,
InstanceDoesNotExist,
InstanceAlreadyLaunching,
InstanceAlreadyRunning,
InstanceIsStopping,
CommunicationError,
UnknownError
}
@ -15,12 +12,9 @@ public static class LaunchInstanceResultExtensions {
public static string ToSentence(this LaunchInstanceResult reason) {
return reason switch {
LaunchInstanceResult.LaunchInitiated => "Launch initiated.",
LaunchInstanceResult.AgentShuttingDown => "Agent is shutting down.",
LaunchInstanceResult.InstanceDoesNotExist => "Instance does not exist.",
LaunchInstanceResult.InstanceAlreadyLaunching => "Instance is already launching.",
LaunchInstanceResult.InstanceAlreadyRunning => "Instance is already running.",
LaunchInstanceResult.InstanceIsStopping => "Instance is stopping.",
LaunchInstanceResult.CommunicationError => "Communication error.",
_ => "Unknown error."
};
}

View File

@ -2,9 +2,6 @@
public enum SendCommandToInstanceResult {
Success,
InstanceDoesNotExist,
AgentShuttingDown,
AgentCommunicationError,
UnknownError
}
@ -12,9 +9,6 @@ public static class SendCommandToInstanceResultExtensions {
public static string ToSentence(this SendCommandToInstanceResult reason) {
return reason switch {
SendCommandToInstanceResult.Success => "Command sent.",
SendCommandToInstanceResult.InstanceDoesNotExist => "Instance does not exist.",
SendCommandToInstanceResult.AgentShuttingDown => "Agent is shutting down.",
SendCommandToInstanceResult.AgentCommunicationError => "Agent did not reply in time.",
_ => "Unknown error."
};
}

View File

@ -2,11 +2,8 @@
public enum StopInstanceResult {
StopInitiated,
AgentShuttingDown,
InstanceDoesNotExist,
InstanceAlreadyStopping,
InstanceAlreadyStopped,
CommunicationError,
UnknownError
}
@ -14,11 +11,8 @@ public static class StopInstanceResultExtensions {
public static string ToSentence(this StopInstanceResult reason) {
return reason switch {
StopInstanceResult.StopInitiated => "Stopping initiated.",
StopInstanceResult.AgentShuttingDown => "Agent is shutting down.",
StopInstanceResult.InstanceDoesNotExist => "Instance does not exist.",
StopInstanceResult.InstanceAlreadyStopping => "Instance is already stopping.",
StopInstanceResult.InstanceAlreadyStopped => "Instance is already stopped.",
StopInstanceResult.CommunicationError => "Communication error.",
_ => "Unknown error."
};
}

View File

@ -0,0 +1,18 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer;
[MemoryPackable]
public sealed partial record ReplyMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] byte[] SerializedReply
) : IMessageToServer, IMessageToAgent {
public Task<NoReply> Accept(IMessageToServerListener listener) {
return listener.HandleReply(this);
}
public Task<NoReply> Accept(IMessageToAgentListener listener) {
return listener.HandleReply(this);
}
}

View File

@ -2,4 +2,8 @@
namespace Phantom.Common.Messages;
public interface IMessageToAgent : IMessage<IMessageToAgentListener> {}
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {}
public interface IMessageToAgent : IMessageToAgent<NoReply> {
uint IMessage<IMessageToAgentListener, NoReply>.SequenceId => 0;
}

View File

@ -1,12 +1,16 @@
using Phantom.Common.Messages.ToAgent;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.ToAgent;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages;
public interface IMessageToAgentListener {
Task HandleRegisterAgentSuccessResult(RegisterAgentSuccessMessage message);
Task HandleRegisterAgentFailureResult(RegisterAgentFailureMessage message);
Task HandleConfigureInstance(ConfigureInstanceMessage message);
Task HandleLaunchInstance(LaunchInstanceMessage message);
Task HandleStopInstance(StopInstanceMessage message);
Task HandleSendCommandToInstance(SendCommandToInstanceMessage message);
Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message);
Task<NoReply> HandleRegisterAgentFailure(RegisterAgentFailureMessage message);
Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message);
Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message);
Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message);
Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message);
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -2,4 +2,8 @@
namespace Phantom.Common.Messages;
public interface IMessageToServer : IMessage<IMessageToServerListener> {}
public interface IMessageToServer<TReply> : IMessage<IMessageToServerListener, TReply> {}
public interface IMessageToServer : IMessageToServer<NoReply> {
uint IMessage<IMessageToServerListener, NoReply>.SequenceId => 0;
}

View File

@ -1,14 +1,15 @@
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages;
public interface IMessageToServerListener {
bool IsDisposed { get; }
Task HandleRegisterAgent(RegisterAgentMessage message);
Task HandleUnregisterAgent(UnregisterAgentMessage message);
Task HandleAgentIsAlive(AgentIsAliveMessage message);
Task HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message);
Task HandleReportInstanceStatus(ReportInstanceStatusMessage message);
Task HandleInstanceOutput(InstanceOutputMessage message);
Task HandleSimpleReply(SimpleReplyMessage message);
Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message);
Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message);
Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message);
Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message);
Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message);
Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message);
Task<NoReply> HandleReply(ReplyMessage message);
}

View File

@ -1,5 +0,0 @@
namespace Phantom.Common.Messages;
public interface IMessageWithReply {
public uint SequenceId { get; }
}

View File

@ -1,4 +1,5 @@
using Phantom.Common.Logging;
using Phantom.Common.Data.Replies;
using Phantom.Common.Logging;
using Phantom.Common.Messages.ToAgent;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc.Message;
@ -6,23 +7,24 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages;
public static class MessageRegistries {
public static MessageRegistry<IMessageToAgentListener, IMessageToAgent> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry:ToAgent"));
public static MessageRegistry<IMessageToServerListener, IMessageToServer> ToServer { get; } = new (PhantomLogger.Create("MessageRegistry:ToServer"));
public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry:ToAgent"));
public static MessageRegistry<IMessageToServerListener> ToServer { get; } = new (PhantomLogger.Create("MessageRegistry:ToServer"));
static MessageRegistries() {
ToAgent.Add<RegisterAgentSuccessMessage>(0);
ToAgent.Add<RegisterAgentFailureMessage>(1);
ToAgent.Add<ConfigureInstanceMessage>(2);
ToAgent.Add<LaunchInstanceMessage>(3);
ToAgent.Add<StopInstanceMessage>(4);
ToAgent.Add<SendCommandToInstanceMessage>(5);
ToAgent.Add<RegisterAgentSuccessMessage, NoReply>(0);
ToAgent.Add<RegisterAgentFailureMessage, NoReply>(1);
ToAgent.Add<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(2);
ToAgent.Add<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(3);
ToAgent.Add<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(4);
ToAgent.Add<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(5);
ToAgent.Add<ReplyMessage, NoReply>(127);
ToServer.Add<RegisterAgentMessage>(0);
ToServer.Add<UnregisterAgentMessage>(1);
ToServer.Add<AgentIsAliveMessage>(2);
ToServer.Add<AdvertiseJavaRuntimesMessage>(3);
ToServer.Add<ReportInstanceStatusMessage>(4);
ToServer.Add<InstanceOutputMessage>(5);
ToServer.Add<SimpleReplyMessage>(127);
ToServer.Add<RegisterAgentMessage, NoReply>(0);
ToServer.Add<UnregisterAgentMessage, NoReply>(1);
ToServer.Add<AgentIsAliveMessage, NoReply>(2);
ToServer.Add<AdvertiseJavaRuntimesMessage, NoReply>(3);
ToServer.Add<ReportInstanceStatusMessage, NoReply>(4);
ToServer.Add<InstanceOutputMessage, NoReply>(5);
ToServer.Add<ReplyMessage, NoReply>(127);
}
}

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies;
namespace Phantom.Common.Messages.ToAgent;
@ -7,8 +8,8 @@ namespace Phantom.Common.Messages.ToAgent;
public sealed partial record ConfigureInstanceMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration
) : IMessageToAgent, IMessageWithReply {
public Task Accept(IMessageToAgentListener listener) {
) : IMessageToAgent<InstanceActionResult<ConfigureInstanceResult>> {
public Task<InstanceActionResult<ConfigureInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleConfigureInstance(this);
}
}

View File

@ -1,4 +1,5 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
namespace Phantom.Common.Messages.ToAgent;
@ -6,8 +7,8 @@ namespace Phantom.Common.Messages.ToAgent;
public sealed partial record LaunchInstanceMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] Guid InstanceGuid
) : IMessageToAgent, IMessageWithReply {
public Task Accept(IMessageToAgentListener listener) {
) : IMessageToAgent<InstanceActionResult<LaunchInstanceResult>> {
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleLaunchInstance(this);
}
}

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToAgent;
@ -7,7 +8,7 @@ namespace Phantom.Common.Messages.ToAgent;
public sealed partial record RegisterAgentFailureMessage(
[property: MemoryPackOrder(0)] RegisterAgentFailure FailureKind
) : IMessageToAgent {
public Task Accept(IMessageToAgentListener listener) {
return listener.HandleRegisterAgentFailureResult(this);
public Task<NoReply> Accept(IMessageToAgentListener listener) {
return listener.HandleRegisterAgentFailure(this);
}
}

View File

@ -1,6 +1,7 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Instance;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToAgent;
@ -8,7 +9,7 @@ namespace Phantom.Common.Messages.ToAgent;
public sealed partial record RegisterAgentSuccessMessage(
[property: MemoryPackOrder(0)] ImmutableArray<InstanceConfiguration> InitialInstances
) : IMessageToAgent {
public Task Accept(IMessageToAgentListener listener) {
return listener.HandleRegisterAgentSuccessResult(this);
public Task<NoReply> Accept(IMessageToAgentListener listener) {
return listener.HandleRegisterAgentSuccess(this);
}
}

View File

@ -1,4 +1,5 @@
using MemoryPack;
using Phantom.Common.Data.Replies;
namespace Phantom.Common.Messages.ToAgent;
@ -7,8 +8,8 @@ public sealed partial record SendCommandToInstanceMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] string Command
) : IMessageToAgent, IMessageWithReply {
public Task Accept(IMessageToAgentListener listener) {
) : IMessageToAgent<InstanceActionResult<SendCommandToInstanceResult>> {
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleSendCommandToInstance(this);
}
}

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
namespace Phantom.Common.Messages.ToAgent;
@ -8,8 +9,8 @@ public sealed partial record StopInstanceMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] MinecraftStopStrategy StopStrategy
) : IMessageToAgent, IMessageWithReply {
public Task Accept(IMessageToAgentListener listener) {
) : IMessageToAgent<InstanceActionResult<StopInstanceResult>> {
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleStopInstance(this);
}
}

View File

@ -1,6 +1,7 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Common.Data.Java;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer;
@ -8,7 +9,7 @@ namespace Phantom.Common.Messages.ToServer;
public sealed partial record AdvertiseJavaRuntimesMessage(
[property: MemoryPackOrder(0)] ImmutableArray<TaggedJavaRuntime> Runtimes
) : IMessageToServer {
public Task Accept(IMessageToServerListener listener) {
public Task<NoReply> Accept(IMessageToServerListener listener) {
return listener.HandleAdvertiseJavaRuntimes(this);
}
}

View File

@ -1,10 +1,11 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer;
[MemoryPackable]
public sealed partial record AgentIsAliveMessage : IMessageToServer {
public Task Accept(IMessageToServerListener listener) {
public Task<NoReply> Accept(IMessageToServerListener listener) {
return listener.HandleAgentIsAlive(this);
}
}

View File

@ -1,5 +1,6 @@
using System.Collections.Immutable;
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer;
@ -8,7 +9,7 @@ public sealed partial record InstanceOutputMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines
) : IMessageToServer {
public Task Accept(IMessageToServerListener listener) {
public Task<NoReply> Accept(IMessageToServerListener listener) {
return listener.HandleInstanceOutput(this);
}
}

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Agent;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer;
@ -8,7 +9,7 @@ public sealed partial record RegisterAgentMessage(
[property: MemoryPackOrder(0)] AgentAuthToken AuthToken,
[property: MemoryPackOrder(1)] AgentInfo AgentInfo
) : IMessageToServer {
public Task Accept(IMessageToServerListener listener) {
public Task<NoReply> Accept(IMessageToServerListener listener) {
return listener.HandleRegisterAgent(this);
}
}

View File

@ -1,5 +1,6 @@
using MemoryPack;
using Phantom.Common.Data.Instance;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer;
@ -8,7 +9,7 @@ public sealed partial record ReportInstanceStatusMessage(
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] IInstanceStatus InstanceStatus
) : IMessageToServer {
public Task Accept(IMessageToServerListener listener) {
public Task<NoReply> Accept(IMessageToServerListener listener) {
return listener.HandleReportInstanceStatus(this);
}
}

View File

@ -1,22 +0,0 @@
using System.Runtime.CompilerServices;
using MemoryPack;
namespace Phantom.Common.Messages.ToServer;
[MemoryPackable]
public sealed partial record SimpleReplyMessage(
[property: MemoryPackOrder(0)] uint SequenceId,
[property: MemoryPackOrder(1)] int EnumValue
) : IMessageToServer {
public static SimpleReplyMessage FromEnum<TEnum>(uint sequenceId, TEnum enumValue) where TEnum : Enum {
if (Unsafe.SizeOf<TEnum>() != Unsafe.SizeOf<int>()) {
throw new ArgumentException("Enum type " + typeof(TEnum).Name + " is not compatible with int.", nameof(TEnum));
}
return new SimpleReplyMessage(sequenceId, Unsafe.As<TEnum, int>(ref enumValue));
}
public Task Accept(IMessageToServerListener listener) {
return listener.HandleSimpleReply(this);
}
}

View File

@ -1,4 +1,5 @@
using MemoryPack;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer;
@ -6,7 +7,7 @@ namespace Phantom.Common.Messages.ToServer;
public sealed partial record UnregisterAgentMessage(
[property: MemoryPackOrder(0)] Guid AgentGuid
) : IMessageToServer {
public Task Accept(IMessageToServerListener listener) {
public Task<NoReply> Accept(IMessageToServerListener listener) {
return listener.HandleUnregisterAgent(this);
}
}

View File

@ -1,11 +1,11 @@
<Project>
<ItemGroup>
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="7.0.0-rc.1.22427.2" />
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="7.0.0-rc.1.22427.2" />
<PackageReference Update="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="7.0.0-rc.1.22427.2" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="7.0.0-rc.1.22426.7" />
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.0-rc.1" />
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="7.0.1" />
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="7.0.1" />
<PackageReference Update="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="7.0.1" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="7.0.1" />
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.1" />
</ItemGroup>
<ItemGroup>
@ -13,22 +13,22 @@
</ItemGroup>
<ItemGroup>
<PackageReference Update="MemoryPack" Version="1.4.1" />
<PackageReference Update="MemoryPack" Version="1.9.7" />
<PackageReference Update="NetMQ" Version="4.0.1.10" />
</ItemGroup>
<ItemGroup>
<PackageReference Update="Serilog" Version="2.12.0" />
<PackageReference Update="Serilog.AspNetCore" Version="6.0.1" />
<PackageReference Update="Serilog.AspNetCore" Version="6.1.0" />
<PackageReference Update="Serilog.Sinks.Console" Version="4.1.0" />
</ItemGroup>
<ItemGroup>
<PackageReference Update="coverlet.collector" Version="3.1.2" />
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="17.3.2" />
<PackageReference Update="coverlet.collector" Version="3.2.0" />
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="17.4.1" />
<PackageReference Update="NUnit" Version="3.13.3" />
<PackageReference Update="NUnit.Analyzers" Version="3.3.0" />
<PackageReference Update="NUnit3TestAdapter" Version="4.2.1" />
<PackageReference Update="NUnit.Analyzers" Version="3.5.0" />
<PackageReference Update="NUnit3TestAdapter" Version="4.3.1" />
</ItemGroup>
</Project>

View File

@ -1,6 +1,8 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Common.Messages;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Server.Rpc;
@ -8,12 +10,15 @@ public sealed class RpcClientConnection {
private readonly ServerSocket socket;
private readonly uint routingId;
private readonly MessageReplyTracker messageReplyTracker;
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
private bool isClosed;
internal RpcClientConnection(ServerSocket socket, uint routingId) {
internal RpcClientConnection(ServerSocket socket, uint routingId, MessageReplyTracker messageReplyTracker) {
this.socket = socket;
this.routingId = routingId;
this.messageReplyTracker = messageReplyTracker;
}
public bool IsSame(RpcClientConnection other) {
@ -29,14 +34,32 @@ public sealed class RpcClientConnection {
}
}
public async Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
if (isClosed) {
return; // TODO
private byte[] WriteBytes<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<TReply> {
return isClosed ? Array.Empty<byte>() : MessageRegistries.ToAgent.Write<TMessage, TReply>(message).ToArray();
}
byte[] bytes = MessageRegistries.ToAgent.Write(message).ToArray();
public async Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
var bytes = WriteBytes<TMessage, NoReply>(message);
if (bytes.Length > 0) {
await socket.SendAsync(routingId, bytes);
}
}
public async Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
var sequenceId = messageReplyTracker.RegisterReply();
var message = messageFactory(sequenceId);
var bytes = WriteBytes<TMessage, TReply>(message);
if (bytes.Length == 0) {
messageReplyTracker.ForgetReply(sequenceId);
return null;
}
await socket.SendAsync(routingId, bytes);
return await messageReplyTracker.WaitForReply<TReply>(message.SequenceId, waitForReplyTime, cancellationToken);
}
public void Receive(ReplyMessage message) {
messageReplyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
}
}

View File

@ -2,6 +2,7 @@
using Phantom.Common.Messages;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Runtime;
using Serilog.Events;
using ILogger = Serilog.ILogger;
@ -38,7 +39,7 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
logger.Information("ZeroMQ server initialized, listening for agent connections on port {Port}.", config.Port);
}
protected override void Run(ServerSocket socket, TaskManager taskManager) {
protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
var logger = config.Logger;
var clients = new Dictionary<ulong, Client>();
@ -60,19 +61,17 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
continue;
}
var connection = new RpcClientConnection(socket, routingId);
var connection = new RpcClientConnection(socket, routingId, replyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(connection, listenerFactory);
client = new Client(connection, listenerFactory, logger, taskManager, cancellationToken);
clients[routingId] = client;
}
LogMessageType(logger, routingId, data);
MessageRegistries.ToServer.Handle(data, client.Listener, taskManager, cancellationToken);
MessageRegistries.ToServer.Handle(data, client);
if (client.Listener.IsDisposed) {
client.Connection.Close();
}
client.CloseIfDisposed();
}
foreach (var client in clients.Values) {
@ -102,13 +101,21 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
return false;
}
private readonly struct Client {
private sealed class Client : MessageHandler<IMessageToServerListener> {
public RpcClientConnection Connection { get; }
public IMessageToServerListener Listener { get; }
public Client(RpcClientConnection connection, Func<RpcClientConnection, IMessageToServerListener> listenerFactory) {
public Client(RpcClientConnection connection, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listenerFactory(connection), logger, taskManager, cancellationToken) {
Connection = connection;
Listener = listenerFactory(connection);
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(new ReplyMessage(sequenceId, serializedReply));
}
public void CloseIfDisposed() {
if (Listener.IsDisposed) {
Connection.Close();
}
}
}
}

View File

@ -18,7 +18,11 @@ sealed class AgentConnection {
connection.Close();
}
public async Task SendMessage<TMessage>(TMessage message) where TMessage : IMessageToAgent {
await connection.Send(message);
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
return connection.Send(message);
}
public Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
return connection.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
}
}

View File

@ -7,7 +7,6 @@ using Phantom.Common.Messages.ToAgent;
using Phantom.Server.Database;
using Phantom.Server.Rpc;
using Phantom.Server.Services.Instances;
using Phantom.Server.Services.Rpc;
using Phantom.Utils.Collections;
using Phantom.Utils.Events;
using Phantom.Utils.Runtime;
@ -114,28 +113,14 @@ public sealed class AgentManager {
}
}
private async Task<bool> SendMessage<TMessage>(Guid guid, TMessage message) where TMessage : IMessageToAgent {
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null;
if (connection != null) {
await connection.SendMessage(message);
return true;
}
else {
if (connection == null) {
// TODO handle missing agent?
return false;
}
}
internal async Task<int?> SendMessageWithReply<TMessage>(Guid guid, Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent, IMessageWithReply {
var sequenceId = MessageReplyTracker.Instance.RegisterReply();
var message = messageFactory(sequenceId);
if (!await SendMessage(guid, message)) {
MessageReplyTracker.Instance.ForgetReply(sequenceId);
return null;
}
return await MessageReplyTracker.Instance.WaitForReply(sequenceId, waitForReplyTime, cancellationToken);
return await connection.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
}
private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> {

View File

@ -6,10 +6,8 @@ public enum AddInstanceResult {
InstanceNameMustNotBeEmpty,
InstanceMemoryMustNotBeZero,
AgentNotFound,
AgentShuttingDown,
AgentInstanceLimitExceeded,
AgentMemoryLimitExceeded,
AgentCommunicationError,
UnknownError
}
@ -20,10 +18,8 @@ public static class AddInstanceResultExtensions {
AddInstanceResult.InstanceNameMustNotBeEmpty => "Instance name must not be empty.",
AddInstanceResult.InstanceMemoryMustNotBeZero => "Memory must not be 0 MB.",
AddInstanceResult.AgentNotFound => "Agent not found.",
AddInstanceResult.AgentShuttingDown => "Agent is shutting down.",
AddInstanceResult.AgentInstanceLimitExceeded => "Agent instance limit exceeded.",
AddInstanceResult.AgentMemoryLimitExceeded => "Agent memory limit exceeded.",
AddInstanceResult.AgentCommunicationError => "Agent did not reply in time.",
_ => "Unknown error."
};
}

View File

@ -54,21 +54,21 @@ public sealed class InstanceManager {
}
}
public async Task<AddInstanceResult> AddInstance(InstanceConfiguration configuration) {
public async Task<InstanceActionResult<AddInstanceResult>> AddInstance(InstanceConfiguration configuration) {
var agent = agentManager.GetAgent(configuration.AgentGuid);
if (agent == null) {
return AddInstanceResult.AgentNotFound;
return InstanceActionResult.Concrete(AddInstanceResult.AgentNotFound);
}
var instance = new Instance(configuration);
if (!instances.ByGuid.TryAdd(instance.Configuration.InstanceGuid, instance)) {
return AddInstanceResult.InstanceAlreadyExists;
return InstanceActionResult.Concrete(AddInstanceResult.InstanceAlreadyExists);
}
var agentName = agent.Name;
var reply = (ConfigureInstanceResult?) await agentManager.SendMessageWithReply(configuration.AgentGuid, sequenceId => new ConfigureInstanceMessage(sequenceId, configuration), TimeSpan.FromSeconds(10));
if (reply == ConfigureInstanceResult.Success) {
var reply = (await agentManager.SendMessage<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(configuration.AgentGuid, sequenceId => new ConfigureInstanceMessage(sequenceId, configuration), TimeSpan.FromSeconds(10))).DidNotReplyIfNull();
if (reply.Is(ConfigureInstanceResult.Success)) {
using (var scope = databaseProvider.CreateScope()) {
InstanceEntity entity = scope.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
@ -87,20 +87,18 @@ public sealed class InstanceManager {
}
Logger.Information("Added instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\".", configuration.InstanceName, configuration.InstanceGuid, agentName);
return AddInstanceResult.Success;
return InstanceActionResult.Concrete(AddInstanceResult.Success);
}
else {
instances.ByGuid.Remove(configuration.InstanceGuid);
var result = reply switch {
null => AddInstanceResult.AgentCommunicationError,
ConfigureInstanceResult.AgentShuttingDown => AddInstanceResult.AgentShuttingDown,
var result = reply.Map(static result => result switch {
ConfigureInstanceResult.InstanceLimitExceeded => AddInstanceResult.AgentInstanceLimitExceeded,
ConfigureInstanceResult.MemoryLimitExceeded => AddInstanceResult.AgentMemoryLimitExceeded,
_ => AddInstanceResult.UnknownError
};
});
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", configuration.InstanceName, configuration.InstanceGuid, agentName, result.ToSentence());
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", configuration.InstanceName, configuration.InstanceGuid, agentName, result.ToSentence(AddInstanceResultExtensions.ToSentence));
return result;
}
}
@ -121,28 +119,28 @@ public sealed class InstanceManager {
instances.ByGuid.ReplaceAllIf(instance => instance with { Status = instanceStatus }, instance => instance.Configuration.AgentGuid == agentGuid);
}
public async Task<LaunchInstanceResult> LaunchInstance(Guid instanceGuid) {
public async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid instanceGuid) {
var instance = GetInstance(instanceGuid);
if (instance == null) {
return LaunchInstanceResult.InstanceDoesNotExist;
return InstanceActionResult.General<LaunchInstanceResult>(InstanceActionGeneralResult.InstanceDoesNotExist);
}
await SetInstanceShouldLaunchAutomatically(instanceGuid, true);
var reply = (LaunchInstanceResult?) await agentManager.SendMessageWithReply(instance.Configuration.AgentGuid, sequenceId => new LaunchInstanceMessage(sequenceId, instanceGuid), TimeSpan.FromSeconds(10));
return reply ?? LaunchInstanceResult.CommunicationError;
var reply = await agentManager.SendMessage<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new LaunchInstanceMessage(sequenceId, instanceGuid), TimeSpan.FromSeconds(10));
return reply.DidNotReplyIfNull();
}
public async Task<StopInstanceResult> StopInstance(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
public async Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
var instance = GetInstance(instanceGuid);
if (instance == null) {
return StopInstanceResult.InstanceDoesNotExist;
return InstanceActionResult.General<StopInstanceResult>(InstanceActionGeneralResult.InstanceDoesNotExist);
}
await SetInstanceShouldLaunchAutomatically(instanceGuid, false);
var reply = (StopInstanceResult?) await agentManager.SendMessageWithReply(instance.Configuration.AgentGuid, sequenceId => new StopInstanceMessage(sequenceId, instanceGuid, stopStrategy), TimeSpan.FromSeconds(10));
return reply ?? StopInstanceResult.CommunicationError;
var reply = await agentManager.SendMessage<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new StopInstanceMessage(sequenceId, instanceGuid, stopStrategy), TimeSpan.FromSeconds(10));
return reply.DidNotReplyIfNull();
}
private async Task SetInstanceShouldLaunchAutomatically(Guid instanceGuid, bool shouldLaunchAutomatically) {
@ -158,14 +156,14 @@ public sealed class InstanceManager {
}
}
public async Task<SendCommandToInstanceResult> SendCommand(Guid instanceGuid, string command) {
public async Task<InstanceActionResult<SendCommandToInstanceResult>> SendCommand(Guid instanceGuid, string command) {
var instance = GetInstance(instanceGuid);
if (instance != null) {
var reply = (SendCommandToInstanceResult?) await agentManager.SendMessageWithReply(instance.Configuration.AgentGuid, sequenceId => new SendCommandToInstanceMessage(sequenceId, instanceGuid, command), TimeSpan.FromSeconds(10));
return reply ?? SendCommandToInstanceResult.AgentCommunicationError;
if (instance == null) {
return InstanceActionResult.General<SendCommandToInstanceResult>(InstanceActionGeneralResult.InstanceDoesNotExist);
}
return SendCommandToInstanceResult.InstanceDoesNotExist;
var reply = await agentManager.SendMessage<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new SendCommandToInstanceMessage(sequenceId, instanceGuid, command), TimeSpan.FromSeconds(10));
return reply.DidNotReplyIfNull();
}
internal ImmutableArray<InstanceConfiguration> GetInstanceConfigurationsForAgent(Guid agentGuid) {

View File

@ -1,58 +0,0 @@
using System.Collections.Concurrent;
using Phantom.Common.Logging;
using Phantom.Common.Messages.ToServer;
using Serilog;
namespace Phantom.Server.Services.Rpc;
sealed class MessageReplyTracker {
private static readonly ILogger Logger = PhantomLogger.Create<MessageReplyTracker>();
public static MessageReplyTracker Instance { get; } = new ();
private uint lastSequenceId;
private readonly ConcurrentDictionary<uint, TaskCompletionSource<int?>> simpleReplyTasks = new (4, 16);
private MessageReplyTracker() {}
public uint RegisterReply() {
var sequenceId = Interlocked.Increment(ref lastSequenceId);
simpleReplyTasks[sequenceId] = new TaskCompletionSource<int?>(TaskCreationOptions.None);
return sequenceId;
}
public async Task<int?> WaitForReply(uint sequenceId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) {
if (!simpleReplyTasks.TryGetValue(sequenceId, out var completionSource)) {
Logger.Warning("No reply callback for id {SequenceId}.", sequenceId);
return null;
}
try {
return await completionSource.Task.WaitAsync(waitForReplyTime, cancellationToken);
} catch (TimeoutException) {
return null;
} catch (OperationCanceledException) {
return null;
} catch (Exception e) {
Logger.Warning(e, "Error processing reply with id {SequenceId}.", sequenceId);
return null;
} finally {
ForgetReply(sequenceId);
}
}
public void ForgetReply(uint sequenceId) {
if (simpleReplyTasks.TryRemove(sequenceId, out var task)) {
task.SetCanceled();
}
}
public void ReceiveReply(SimpleReplyMessage message) {
if (simpleReplyTasks.TryRemove(message.SequenceId, out var task)) {
task.SetResult(message.EnumValue);
}
else {
Logger.Warning("Received a reply with id {SequenceId} but no registered callback.", message.SequenceId);
}
}
}

View File

@ -6,6 +6,7 @@ using Phantom.Common.Messages.ToServer;
using Phantom.Server.Rpc;
using Phantom.Server.Services.Agents;
using Phantom.Server.Services.Instances;
using Phantom.Utils.Rpc.Message;
namespace Phantom.Server.Services.Rpc;
@ -31,7 +32,7 @@ public sealed class MessageToServerListener : IMessageToServerListener {
this.instanceLogManager = instanceLogManager;
}
public async Task HandleRegisterAgent(RegisterAgentMessage message) {
public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) {
if (agentGuid != null && agentGuid != message.AgentInfo.Guid) {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
}
@ -40,42 +41,46 @@ public sealed class MessageToServerListener : IMessageToServerListener {
agentGuid = guid;
agentGuidWaiter.SetResult(guid);
}
return NoReply.Instance;
}
private async Task<Guid> WaitForAgentGuid() {
return await agentGuidWaiter.Task.WaitAsync(cancellationToken);
}
public Task HandleUnregisterAgent(UnregisterAgentMessage message) {
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
IsDisposed = true;
if (agentManager.UnregisterAgent(message.AgentGuid, connection)) {
instanceManager.SetInstanceStatesForAgent(message.AgentGuid, InstanceStatus.Offline);
}
return Task.CompletedTask;
return Task.FromResult(NoReply.Instance);
}
public async Task HandleAgentIsAlive(AgentIsAliveMessage message) {
public async Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message) {
agentManager.NotifyAgentIsAlive(await WaitForAgentGuid());
return NoReply.Instance;
}
public async Task HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
public async Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
agentJavaRuntimesManager.Update(await WaitForAgentGuid(), message.Runtimes);
return NoReply.Instance;
}
public Task HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
public Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
instanceManager.SetInstanceState(message.InstanceGuid, message.InstanceStatus);
return Task.CompletedTask;
return Task.FromResult(NoReply.Instance);
}
public Task HandleInstanceOutput(InstanceOutputMessage message) {
public Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message) {
instanceLogManager.AddLines(message.InstanceGuid, message.Lines);
return Task.CompletedTask;
return Task.FromResult(NoReply.Instance);
}
public Task HandleSimpleReply(SimpleReplyMessage message) {
MessageReplyTracker.Instance.ReceiveReply(message);
return Task.CompletedTask;
public Task<NoReply> HandleReply(ReplyMessage message) {
connection.Receive(message);
return Task.FromResult(NoReply.Instance);
}
}

View File

@ -294,12 +294,12 @@
var instance = new InstanceConfiguration(selectedAgent.Agent.Guid, instanceGuid, form.InstanceName, serverPort, rconPort, form.MinecraftVersion, form.MinecraftServerKind, memoryAllocation, javaRuntimeGuid, jvmArguments, LaunchAutomatically: false);
var result = await InstanceManager.AddInstance(instance);
if (result == AddInstanceResult.Success) {
if (result.Is(AddInstanceResult.Success)) {
await AuditLog.AddInstanceCreatedEvent(instance.InstanceGuid);
Nav.NavigateTo("instances/" + instance.InstanceGuid);
}
else {
form.SubmitModel.StopSubmitting(result.ToSentence());
form.SubmitModel.StopSubmitting(result.ToSentence(AddInstanceResultExtensions.ToSentence));
}
}

View File

@ -71,11 +71,11 @@ else {
}
var result = await InstanceManager.LaunchInstance(InstanceGuid);
if (result == LaunchInstanceResult.LaunchInitiated) {
if (result.Is(LaunchInstanceResult.LaunchInitiated)) {
await AuditLog.AddInstanceLaunchedEvent(InstanceGuid);
}
else {
lastError = result.ToSentence();
lastError = result.ToSentence(LaunchInstanceResultExtensions.ToSentence);
}
} finally {
isLaunchingInstance = false;

View File

@ -1,6 +1,6 @@
@using Phantom.Common.Data.Replies
@using Phantom.Server.Services.Instances
@using Phantom.Server.Services.Audit
@using Phantom.Server.Services.Instances
@using Phantom.Common.Data.Replies
@inherits PhantomComponent
@inject InstanceManager InstanceManager
@inject AuditLog AuditLog
@ -40,13 +40,13 @@
}
var result = await InstanceManager.SendCommand(InstanceGuid, form.Command);
if (result == SendCommandToInstanceResult.Success) {
if (result.Is(SendCommandToInstanceResult.Success)) {
await AuditLog.AddInstanceCommandExecutedEvent(InstanceGuid, form.Command);
form.Command = string.Empty;
form.SubmitModel.StopSubmitting();
}
else {
form.SubmitModel.StopSubmitting(result.ToSentence());
form.SubmitModel.StopSubmitting(result.ToSentence(SendCommandToInstanceResultExtensions.ToSentence));
}
await commandInputElement.FocusAsync(preventScroll: true);

View File

@ -1,8 +1,8 @@
@using Phantom.Common.Data.Replies
@using Phantom.Server.Services.Instances
@using Phantom.Server.Services.Audit
@using Phantom.Server.Services.Instances
@using System.ComponentModel.DataAnnotations
@using Phantom.Common.Data.Minecraft
@using Phantom.Common.Data.Replies
@inherits PhantomComponent
@inject IJSRuntime Js;
@inject InstanceManager InstanceManager;
@ -57,13 +57,13 @@
}
var result = await InstanceManager.StopInstance(InstanceGuid, new MinecraftStopStrategy(form.StopInSeconds));
if (result == StopInstanceResult.StopInitiated) {
if (result.Is(StopInstanceResult.StopInitiated)) {
await AuditLog.AddInstanceStoppedEvent(InstanceGuid, form.StopInSeconds);
await Js.InvokeVoidAsync("closeModal", ModalId);
form.SubmitModel.StopSubmitting();
}
else {
form.SubmitModel.StopSubmitting(result.ToSentence());
form.SubmitModel.StopSubmitting(result.ToSentence(StopInstanceResultExtensions.ToSentence));
}
}

View File

@ -1,5 +1,6 @@
namespace Phantom.Utils.Rpc.Message;
public interface IMessage<TListener> {
Task Accept(TListener listener);
public interface IMessage<TListener, TReply> {
public uint SequenceId { get; }
Task<TReply> Accept(TListener listener);
}

View File

@ -0,0 +1,40 @@
using Phantom.Utils.Runtime;
using Serilog;
namespace Phantom.Utils.Rpc.Message;
public abstract class MessageHandler<TListener> {
protected TListener Listener { get; }
private readonly ILogger logger;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
protected MessageHandler(TListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) {
this.Listener = listener;
this.logger = logger;
this.taskManager = taskManager;
this.cancellationToken = cancellationToken;
}
internal void Enqueue<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
cancellationToken.ThrowIfCancellationRequested();
taskManager.Run(async () => {
try {
await Handle<TMessage, TReply>(message);
} catch (Exception e) {
logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
}
});
}
private async Task Handle<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
TReply reply = await message.Accept(Listener);
if (reply is not NoReply) {
await SendReply(message.SequenceId, MessageSerializer.Serialize(reply));
}
}
protected abstract Task SendReply(uint sequenceId, byte[] serializedReply);
}

View File

@ -1,27 +1,26 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using Phantom.Utils.Runtime;
using Serilog;
using Serilog.Events;
namespace Phantom.Utils.Rpc.Message;
public sealed class MessageRegistry<TListener, TMessageBase> where TMessageBase : class, IMessage<TListener> {
public sealed class MessageRegistry<TListener> {
private const int DefaultBufferSize = 512;
private readonly ILogger logger;
private readonly Dictionary<Type, ushort> typeToCodeMapping = new ();
private readonly Dictionary<ushort, Type> codeToTypeMapping = new ();
private readonly Dictionary<ushort, Func<ReadOnlyMemory<byte>, TMessageBase>> codeToDeserializerMapping = new ();
private readonly Dictionary<ushort, Action<ReadOnlyMemory<byte>, ushort, MessageHandler<TListener>>> codeToHandlerMapping = new ();
public MessageRegistry(ILogger logger) {
this.logger = logger;
}
public void Add<TMessage>(ushort code) where TMessage : TMessageBase {
public void Add<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
typeToCodeMapping.Add(typeof(TMessage), code);
codeToTypeMapping.Add(code, typeof(TMessage));
codeToDeserializerMapping.Add(code, MessageSerializer.Deserialize<TMessage, TMessageBase, TListener>());
codeToHandlerMapping.Add(code, HandleInternal<TMessage, TReply>);
}
public bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
@ -34,7 +33,11 @@ public sealed class MessageRegistry<TListener, TMessageBase> where TMessageBase
}
}
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : TMessageBase {
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
return Write<TMessage, NoReply>(message);
}
public ReadOnlySpan<byte> Write<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
if (!typeToCodeMapping.TryGetValue(typeof(TMessage), out ushort code)) {
logger.Error("Unknown message type {Type}.", typeof(TMessage));
return default;
@ -44,7 +47,7 @@ public sealed class MessageRegistry<TListener, TMessageBase> where TMessageBase
try {
MessageSerializer.WriteCode(buffer, code);
MessageSerializer.Serialize<TMessage, TListener>(buffer, message);
MessageSerializer.Serialize(buffer, message);
if (buffer.WrittenCount > DefaultBufferSize && logger.IsEnabled(LogEventLevel.Verbose)) {
logger.Verbose("Serializing {Type} exceeded default buffer size: {WrittenSize} B > {DefaultBufferSize} B", typeof(TMessage).Name, buffer.WrittenCount, DefaultBufferSize);
@ -57,7 +60,7 @@ public sealed class MessageRegistry<TListener, TMessageBase> where TMessageBase
}
}
public void Handle(ReadOnlyMemory<byte> data, TListener listener, TaskManager taskManager, CancellationToken cancellationToken) {
public void Handle(ReadOnlyMemory<byte> data, MessageHandler<TListener> handler) {
ushort code;
try {
code = MessageSerializer.ReadCode(ref data);
@ -66,28 +69,23 @@ public sealed class MessageRegistry<TListener, TMessageBase> where TMessageBase
return;
}
if (!codeToDeserializerMapping.TryGetValue(code, out var deserialize)) {
if (!codeToHandlerMapping.TryGetValue(code, out var handle)) {
logger.Error("Unknown message code {Code}.", code);
return;
}
TMessageBase message;
handle(data, code, handler);
}
private void HandleInternal<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, TReply> {
TMessage message;
try {
message = deserialize(data);
message = MessageSerializer.Deserialize<TMessage>(data);
} catch (Exception e) {
logger.Error(e, "Failed to deserialize message with code {Code}.", code);
return;
}
async Task HandleMessage() {
try {
await message.Accept(listener);
} catch (Exception e) {
logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
}
}
cancellationToken.ThrowIfCancellationRequested();
taskManager.Run(HandleMessage);
handler.Enqueue<TMessage, TReply>(message);
}
}

View File

@ -0,0 +1,57 @@
using System.Collections.Concurrent;
using Serilog;
namespace Phantom.Utils.Rpc.Message;
public sealed class MessageReplyTracker {
private readonly ILogger logger;
private readonly ConcurrentDictionary<uint, TaskCompletionSource<byte[]>> replyTasks = new (4, 16);
private uint lastSequenceId;
internal MessageReplyTracker(ILogger logger) {
this.logger = logger;
}
public uint RegisterReply() {
var sequenceId = Interlocked.Increment(ref lastSequenceId);
replyTasks[sequenceId] = new TaskCompletionSource<byte[]>(TaskCreationOptions.None);
return sequenceId;
}
public async Task<TReply?> WaitForReply<TReply>(uint sequenceId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TReply : class {
if (!replyTasks.TryGetValue(sequenceId, out var completionSource)) {
logger.Warning("No reply callback for id {SequenceId}.", sequenceId);
return null;
}
try {
byte[] replyBytes = await completionSource.Task.WaitAsync(waitForReplyTime, cancellationToken);
return MessageSerializer.Deserialize<TReply>(replyBytes);
} catch (TimeoutException) {
return null;
} catch (OperationCanceledException) {
return null;
} catch (Exception e) {
logger.Warning(e, "Error processing reply with id {SequenceId}.", sequenceId);
return null;
} finally {
ForgetReply(sequenceId);
}
}
public void ForgetReply(uint sequenceId) {
if (replyTasks.TryRemove(sequenceId, out var task)) {
task.SetCanceled();
}
}
public void ReceiveReply(uint sequenceId, byte[] serializedReply) {
if (replyTasks.TryRemove(sequenceId, out var task)) {
task.SetResult(serializedReply);
}
else {
logger.Warning("Received a reply with id {SequenceId} but no registered callback.", sequenceId);
}
}
}

View File

@ -5,14 +5,18 @@ using MemoryPack;
namespace Phantom.Utils.Rpc.Message;
static class MessageSerializer {
private static readonly MemoryPackSerializeOptions SerializerOptions = MemoryPackSerializeOptions.Utf8;
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
public static void Serialize<TMessage, TListener>(IBufferWriter<byte> destination, TMessage message) where TMessage : IMessage<TListener> {
MemoryPackSerializer.Serialize(typeof(TMessage), destination, message, SerializerOptions);
public static byte[] Serialize<T>(T message) {
return MemoryPackSerializer.Serialize(typeof(T), message, SerializerOptions);
}
public static Func<ReadOnlyMemory<byte>, TMessageBase> Deserialize<TMessage, TMessageBase, TListener>() where TMessageBase : IMessage<TListener> where TMessage : TMessageBase {
return static memory => MemoryPackSerializer.Deserialize<TMessage>(memory.Span) ?? throw new NullReferenceException();
public static void Serialize<T>(IBufferWriter<byte> destination, T message) {
MemoryPackSerializer.Serialize(typeof(T), destination, message, SerializerOptions);
}
public static T Deserialize<T>(ReadOnlyMemory<byte> memory) {
return MemoryPackSerializer.Deserialize<T>(memory.Span) ?? throw new NullReferenceException();
}
public static void WriteCode(IBufferWriter<byte> destination, ushort value) {

View File

@ -0,0 +1,5 @@
namespace Phantom.Utils.Rpc.Message;
public readonly struct NoReply {
public static NoReply Instance { get; } = new ();
}

View File

@ -1,4 +1,5 @@
using NetMQ;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Runtime;
using Serilog;
@ -25,6 +26,7 @@ static class RpcRuntime {
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new() {
private readonly TSocket socket;
private readonly MessageReplyTracker replyTracker;
private readonly TaskManager taskManager;
private readonly ILogger logger;
@ -33,6 +35,7 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new(
RpcRuntime.SetDefaultSocketOptions(socket.Options);
this.socket = socket;
this.logger = logger;
this.replyTracker = new MessageReplyTracker(logger);
this.taskManager = new TaskManager();
}
@ -40,7 +43,7 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new(
Connect(socket);
void RunTask() {
Run(socket, taskManager);
Run(socket, replyTracker, taskManager);
}
try {
@ -50,7 +53,7 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new(
} finally {
logger.Information("Stopping task manager...");
await taskManager.Stop();
await Disconnect(socket);
await Disconnect();
socket.Dispose();
NetMQConfig.Cleanup();
@ -59,9 +62,9 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new(
}
protected abstract void Connect(TSocket socket);
protected abstract void Run(TSocket socket, TaskManager taskManager);
protected abstract void Run(TSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager);
protected virtual Task Disconnect(TSocket socket) {
protected virtual Task Disconnect() {
return Task.CompletedTask;
}
}