mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2024-11-25 16:42:54 +01:00
Compare commits
2 Commits
c0243dc749
...
2cc7975193
Author | SHA1 | Date | |
---|---|---|---|
2cc7975193 | |||
6472134f9a |
@ -1,5 +1,4 @@
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Common.Logging;
|
||||
using Phantom.Common.Logging;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Utils.Runtime;
|
||||
using Serilog;
|
||||
@ -11,11 +10,11 @@ sealed class KeepAliveLoop {
|
||||
|
||||
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromSeconds(10);
|
||||
|
||||
private readonly ClientSocket socket;
|
||||
private readonly RpcServerConnection connection;
|
||||
private readonly CancellationTokenSource cancellationTokenSource = new ();
|
||||
|
||||
public KeepAliveLoop(ClientSocket socket, TaskManager taskManager) {
|
||||
this.socket = socket;
|
||||
public KeepAliveLoop(RpcServerConnection connection, TaskManager taskManager) {
|
||||
this.connection = connection;
|
||||
taskManager.Run(Run);
|
||||
}
|
||||
|
||||
@ -26,7 +25,7 @@ sealed class KeepAliveLoop {
|
||||
try {
|
||||
while (true) {
|
||||
await Task.Delay(KeepAliveInterval, cancellationToken);
|
||||
await socket.SendMessage(new AgentIsAliveMessage());
|
||||
await connection.Send(new AgentIsAliveMessage());
|
||||
}
|
||||
} catch (OperationCanceledException) {
|
||||
// Ignore.
|
||||
|
@ -1,19 +0,0 @@
|
||||
using NetMQ;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Common.Messages;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
|
||||
namespace Phantom.Agent.Rpc;
|
||||
|
||||
public static class RpcExtensions {
|
||||
internal static async Task SendMessage<TMessage>(this ClientSocket socket, TMessage message) where TMessage : IMessageToServer {
|
||||
byte[] bytes = MessageRegistries.ToServer.Write(message).ToArray();
|
||||
if (bytes.Length > 0) {
|
||||
await socket.SendAsync(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
public static Task SendSimpleReply<TMessage, TReplyEnum>(this ClientSocket socket, TMessage message, TReplyEnum reply) where TMessage : IMessageWithReply where TReplyEnum : Enum {
|
||||
return SendMessage(socket, SimpleReplyMessage.FromEnum(message.SequenceId, reply));
|
||||
}
|
||||
}
|
@ -4,6 +4,7 @@ using Phantom.Common.Data.Agent;
|
||||
using Phantom.Common.Messages;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Utils.Rpc;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Runtime;
|
||||
using Serilog;
|
||||
using Serilog.Events;
|
||||
@ -11,7 +12,7 @@ using Serilog.Events;
|
||||
namespace Phantom.Agent.Rpc;
|
||||
|
||||
public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
|
||||
public static async Task Launch(RpcConfiguration config, AgentAuthToken authToken, AgentInfo agentInfo, Func<ClientSocket, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
|
||||
public static async Task Launch(RpcConfiguration config, AgentAuthToken authToken, AgentInfo agentInfo, Func<RpcServerConnection, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
|
||||
var socket = new ClientSocket();
|
||||
var options = socket.Options;
|
||||
|
||||
@ -24,12 +25,12 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
|
||||
|
||||
private readonly RpcConfiguration config;
|
||||
private readonly Guid agentGuid;
|
||||
private readonly Func<ClientSocket, IMessageToAgentListener> messageListenerFactory;
|
||||
private readonly Func<RpcServerConnection, IMessageToAgentListener> messageListenerFactory;
|
||||
|
||||
private readonly SemaphoreSlim disconnectSemaphore;
|
||||
private readonly CancellationToken receiveCancellationToken;
|
||||
|
||||
private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<ClientSocket, IMessageToAgentListener> messageListenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, config.Logger) {
|
||||
private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<RpcServerConnection, IMessageToAgentListener> messageListenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, config.Logger) {
|
||||
this.config = config;
|
||||
this.agentGuid = agentGuid;
|
||||
this.messageListenerFactory = messageListenerFactory;
|
||||
@ -46,12 +47,13 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
|
||||
logger.Information("ZeroMQ client ready.");
|
||||
}
|
||||
|
||||
protected override void Run(ClientSocket socket, TaskManager taskManager) {
|
||||
var logger = config.Logger;
|
||||
var listener = messageListenerFactory(socket);
|
||||
protected override void Run(ClientSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
|
||||
var connection = new RpcServerConnection(socket, replyTracker);
|
||||
ServerMessaging.SetCurrentConnection(connection);
|
||||
|
||||
ServerMessaging.SetCurrentSocket(socket);
|
||||
var keepAliveLoop = new KeepAliveLoop(socket, taskManager);
|
||||
var logger = config.Logger;
|
||||
var handler = new MessageToAgentHandler(messageListenerFactory(connection), logger, taskManager, receiveCancellationToken);
|
||||
var keepAliveLoop = new KeepAliveLoop(connection, taskManager);
|
||||
|
||||
try {
|
||||
while (!receiveCancellationToken.IsCancellationRequested) {
|
||||
@ -60,7 +62,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
|
||||
LogMessageType(logger, data);
|
||||
|
||||
if (data.Length > 0) {
|
||||
MessageRegistries.ToAgent.Handle(data, listener, taskManager, receiveCancellationToken);
|
||||
MessageRegistries.ToAgent.Handle(data, handler);
|
||||
}
|
||||
}
|
||||
} catch (OperationCanceledException) {
|
||||
@ -86,11 +88,19 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
|
||||
}
|
||||
}
|
||||
|
||||
protected override async Task Disconnect(ClientSocket socket) {
|
||||
protected override async Task Disconnect() {
|
||||
var unregisterTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), CancellationToken.None);
|
||||
var finishedTask = await Task.WhenAny(socket.SendMessage(new UnregisterAgentMessage(agentGuid)), unregisterTimeoutTask);
|
||||
var finishedTask = await Task.WhenAny(ServerMessaging.Send(new UnregisterAgentMessage(agentGuid)), unregisterTimeoutTask);
|
||||
if (finishedTask == unregisterTimeoutTask) {
|
||||
config.Logger.Error("Timed out communicating agent shutdown with the server.");
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class MessageToAgentHandler : MessageHandler<IMessageToAgentListener> {
|
||||
public MessageToAgentHandler(IMessageToAgentListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {}
|
||||
|
||||
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
||||
return ServerMessaging.Send(new ReplyMessage(sequenceId, serializedReply));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
46
Agent/Phantom.Agent.Rpc/RpcServerConnection.cs
Normal file
46
Agent/Phantom.Agent.Rpc/RpcServerConnection.cs
Normal file
@ -0,0 +1,46 @@
|
||||
using NetMQ;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Common.Messages;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Agent.Rpc;
|
||||
|
||||
public sealed class RpcServerConnection {
|
||||
private readonly ClientSocket socket;
|
||||
private readonly MessageReplyTracker replyTracker;
|
||||
|
||||
internal RpcServerConnection(ClientSocket socket, MessageReplyTracker replyTracker) {
|
||||
this.socket = socket;
|
||||
this.replyTracker = replyTracker;
|
||||
}
|
||||
|
||||
private byte[] WriteBytes<TMessage, TReply>(TMessage message) where TMessage : IMessageToServer<TReply> {
|
||||
return MessageRegistries.ToServer.Write<TMessage, TReply>(message).ToArray();
|
||||
}
|
||||
|
||||
internal async Task Send<TMessage>(TMessage message) where TMessage : IMessageToServer {
|
||||
var bytes = WriteBytes<TMessage, NoReply>(message);
|
||||
if (bytes.Length > 0) {
|
||||
await socket.SendAsync(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
|
||||
var sequenceId = replyTracker.RegisterReply();
|
||||
var message = messageFactory(sequenceId);
|
||||
|
||||
var bytes = WriteBytes<TMessage, TReply>(message);
|
||||
if (bytes.Length == 0) {
|
||||
replyTracker.ForgetReply(sequenceId);
|
||||
return null;
|
||||
}
|
||||
|
||||
await socket.SendAsync(bytes);
|
||||
return await replyTracker.WaitForReply<TReply>(message.SequenceId, waitForReplyTime, cancellationToken);
|
||||
}
|
||||
|
||||
public void Receive(ReplyMessage message) {
|
||||
replyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
|
||||
}
|
||||
}
|
@ -1,5 +1,4 @@
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Common.Logging;
|
||||
using Phantom.Common.Logging;
|
||||
using Phantom.Common.Messages;
|
||||
using Serilog;
|
||||
|
||||
@ -8,23 +7,28 @@ namespace Phantom.Agent.Rpc;
|
||||
public static class ServerMessaging {
|
||||
private static readonly ILogger Logger = PhantomLogger.Create(typeof(ServerMessaging));
|
||||
|
||||
private static ClientSocket? CurrentSocket { get; set; }
|
||||
private static readonly object SetCurrentSocketLock = new ();
|
||||
private static RpcServerConnection? CurrentConnection { get; set; }
|
||||
private static RpcServerConnection CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready.");
|
||||
|
||||
internal static void SetCurrentSocket(ClientSocket socket) {
|
||||
lock (SetCurrentSocketLock) {
|
||||
if (CurrentSocket != null) {
|
||||
throw new InvalidOperationException("Server socket can only be set once.");
|
||||
private static readonly object SetCurrentConnectionLock = new ();
|
||||
|
||||
internal static void SetCurrentConnection(RpcServerConnection connection) {
|
||||
lock (SetCurrentConnectionLock) {
|
||||
if (CurrentConnection != null) {
|
||||
throw new InvalidOperationException("Server connection can only be set once.");
|
||||
}
|
||||
|
||||
CurrentSocket = socket;
|
||||
CurrentConnection = connection;
|
||||
}
|
||||
|
||||
Logger.Information("Server socket ready.");
|
||||
Logger.Information("Server connection ready.");
|
||||
}
|
||||
|
||||
public static async Task SendMessage<TMessage>(TMessage message) where TMessage : IMessageToServer {
|
||||
var currentSocket = CurrentSocket ?? throw new InvalidOperationException("Server socket not ready.");
|
||||
await currentSocket.SendMessage(message);
|
||||
public static Task Send<TMessage>(TMessage message) where TMessage : IMessageToServer {
|
||||
return CurrentConnectionOrThrow.Send(message);
|
||||
}
|
||||
|
||||
public static Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
|
||||
return CurrentConnectionOrThrow.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ sealed class Instance : IDisposable {
|
||||
}
|
||||
|
||||
private async Task ReportLastStatus() {
|
||||
await ServerMessaging.SendMessage(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
|
||||
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
|
||||
}
|
||||
|
||||
private void TransitionState(IInstanceState newState) {
|
||||
@ -143,7 +143,7 @@ sealed class Instance : IDisposable {
|
||||
instance.launchServices.TaskManager.Run(async () => {
|
||||
if (myStatusUpdateCounter == statusUpdateCounter) {
|
||||
instance.currentStatus = newStatus;
|
||||
await ServerMessaging.SendMessage(new ReportInstanceStatusMessage(Configuration.InstanceGuid, newStatus));
|
||||
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, newStatus));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ sealed class InstanceLogSender {
|
||||
|
||||
private async Task SendOutputToServer(ImmutableArray<string> lines) {
|
||||
if (!lines.IsEmpty) {
|
||||
await ServerMessaging.SendMessage(new InstanceOutputMessage(instanceGuid, lines));
|
||||
await ServerMessaging.Send(new InstanceOutputMessage(instanceGuid, lines));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
using Phantom.Agent.Minecraft.Instance;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using Phantom.Agent.Minecraft.Instance;
|
||||
using Phantom.Agent.Minecraft.Java;
|
||||
using Phantom.Agent.Minecraft.Launcher;
|
||||
using Phantom.Agent.Minecraft.Launcher.Types;
|
||||
@ -40,11 +41,31 @@ sealed class InstanceSessionManager : IDisposable {
|
||||
this.shutdownCancellationToken = shutdownCancellationTokenSource.Token;
|
||||
}
|
||||
|
||||
public async Task<ConfigureInstanceResult> Configure(InstanceConfiguration configuration) {
|
||||
[SuppressMessage("ReSharper", "ConvertIfStatementToReturnStatement")]
|
||||
private async Task<InstanceActionResult<T>> AcquireSemaphoreAndRunWithInstance<T>(Guid instanceGuid, Func<Instance, Task<T>> func) {
|
||||
try {
|
||||
await semaphore.WaitAsync(shutdownCancellationToken);
|
||||
} catch (OperationCanceledException) {
|
||||
return ConfigureInstanceResult.AgentShuttingDown;
|
||||
return InstanceActionResult.General<T>(InstanceActionGeneralResult.AgentShuttingDown);
|
||||
}
|
||||
|
||||
try {
|
||||
if (!instances.TryGetValue(instanceGuid, out var instance)) {
|
||||
return InstanceActionResult.General<T>(InstanceActionGeneralResult.InstanceDoesNotExist);
|
||||
}
|
||||
else {
|
||||
return InstanceActionResult.Concrete(await func(instance));
|
||||
}
|
||||
} finally {
|
||||
semaphore.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<InstanceActionResult<ConfigureInstanceResult>> Configure(InstanceConfiguration configuration) {
|
||||
try {
|
||||
await semaphore.WaitAsync(shutdownCancellationToken);
|
||||
} catch (OperationCanceledException) {
|
||||
return InstanceActionResult.General<ConfigureInstanceResult>(InstanceActionGeneralResult.AgentShuttingDown);
|
||||
}
|
||||
|
||||
var instanceGuid = configuration.InstanceGuid;
|
||||
@ -52,12 +73,12 @@ sealed class InstanceSessionManager : IDisposable {
|
||||
try {
|
||||
var otherInstances = instances.Values.Where(inst => inst.Configuration.InstanceGuid != instanceGuid).ToArray();
|
||||
if (otherInstances.Length + 1 > agentInfo.MaxInstances) {
|
||||
return ConfigureInstanceResult.InstanceLimitExceeded;
|
||||
return InstanceActionResult.Concrete(ConfigureInstanceResult.InstanceLimitExceeded);
|
||||
}
|
||||
|
||||
var availableMemory = agentInfo.MaxMemory - otherInstances.Aggregate(RamAllocationUnits.Zero, static (total, instance) => total + instance.Configuration.MemoryAllocation);
|
||||
if (availableMemory < configuration.MemoryAllocation) {
|
||||
return ConfigureInstanceResult.MemoryLimitExceeded;
|
||||
return InstanceActionResult.Concrete(ConfigureInstanceResult.MemoryLimitExceeded);
|
||||
}
|
||||
|
||||
var heapMegabytes = configuration.MemoryAllocation.InMegabytes;
|
||||
@ -93,70 +114,22 @@ sealed class InstanceSessionManager : IDisposable {
|
||||
await instance.Launch(shutdownCancellationToken);
|
||||
}
|
||||
|
||||
return ConfigureInstanceResult.Success;
|
||||
return InstanceActionResult.Concrete(ConfigureInstanceResult.Success);
|
||||
} finally {
|
||||
semaphore.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<LaunchInstanceResult> Launch(Guid instanceGuid) {
|
||||
try {
|
||||
await semaphore.WaitAsync(shutdownCancellationToken);
|
||||
} catch (OperationCanceledException) {
|
||||
return LaunchInstanceResult.AgentShuttingDown;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!instances.TryGetValue(instanceGuid, out var instance)) {
|
||||
return LaunchInstanceResult.InstanceDoesNotExist;
|
||||
}
|
||||
else {
|
||||
return await instance.Launch(shutdownCancellationToken);
|
||||
}
|
||||
} finally {
|
||||
semaphore.Release();
|
||||
}
|
||||
public Task<InstanceActionResult<LaunchInstanceResult>> Launch(Guid instanceGuid) {
|
||||
return AcquireSemaphoreAndRunWithInstance(instanceGuid, instance => instance.Launch(shutdownCancellationToken));
|
||||
}
|
||||
|
||||
public async Task<StopInstanceResult> Stop(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
|
||||
try {
|
||||
await semaphore.WaitAsync(shutdownCancellationToken);
|
||||
} catch (OperationCanceledException) {
|
||||
return StopInstanceResult.AgentShuttingDown;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!instances.TryGetValue(instanceGuid, out var instance)) {
|
||||
return StopInstanceResult.InstanceDoesNotExist;
|
||||
}
|
||||
else {
|
||||
return await instance.Stop(stopStrategy);
|
||||
}
|
||||
} finally {
|
||||
semaphore.Release();
|
||||
}
|
||||
public Task<InstanceActionResult<StopInstanceResult>> Stop(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
|
||||
return AcquireSemaphoreAndRunWithInstance(instanceGuid, instance => instance.Stop(stopStrategy));
|
||||
}
|
||||
|
||||
public async Task<SendCommandToInstanceResult> SendCommand(Guid instanceGuid, string command) {
|
||||
try {
|
||||
await semaphore.WaitAsync(shutdownCancellationToken);
|
||||
} catch (OperationCanceledException) {
|
||||
return SendCommandToInstanceResult.AgentShuttingDown;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!instances.TryGetValue(instanceGuid, out var instance)) {
|
||||
return SendCommandToInstanceResult.InstanceDoesNotExist;
|
||||
}
|
||||
|
||||
if (!await instance.SendCommand(command, shutdownCancellationToken)) {
|
||||
return SendCommandToInstanceResult.UnknownError;
|
||||
}
|
||||
|
||||
return SendCommandToInstanceResult.Success;
|
||||
} finally {
|
||||
semaphore.Release();
|
||||
}
|
||||
public Task<InstanceActionResult<SendCommandToInstanceResult>> SendCommand(Guid instanceGuid, string command) {
|
||||
return AcquireSemaphoreAndRunWithInstance(instanceGuid, async instance => await instance.SendCommand(command, shutdownCancellationToken) ? SendCommandToInstanceResult.Success : SendCommandToInstanceResult.UnknownError);
|
||||
}
|
||||
|
||||
public async Task StopAll() {
|
||||
|
@ -1,10 +1,10 @@
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Agent.Rpc;
|
||||
using Phantom.Agent.Rpc;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Logging;
|
||||
using Phantom.Common.Messages;
|
||||
using Phantom.Common.Messages.ToAgent;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Agent.Services.Rpc;
|
||||
@ -12,32 +12,34 @@ namespace Phantom.Agent.Services.Rpc;
|
||||
public sealed class MessageListener : IMessageToAgentListener {
|
||||
private static ILogger Logger { get; } = PhantomLogger.Create<MessageListener>();
|
||||
|
||||
private readonly ClientSocket socket;
|
||||
private readonly RpcServerConnection connection;
|
||||
private readonly AgentServices agent;
|
||||
private readonly CancellationTokenSource shutdownTokenSource;
|
||||
|
||||
public MessageListener(ClientSocket socket, AgentServices agent, CancellationTokenSource shutdownTokenSource) {
|
||||
this.socket = socket;
|
||||
public MessageListener(RpcServerConnection connection, AgentServices agent, CancellationTokenSource shutdownTokenSource) {
|
||||
this.connection = connection;
|
||||
this.agent = agent;
|
||||
this.shutdownTokenSource = shutdownTokenSource;
|
||||
}
|
||||
|
||||
public async Task HandleRegisterAgentSuccessResult(RegisterAgentSuccessMessage message) {
|
||||
public async Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
|
||||
Logger.Information("Agent authentication successful.");
|
||||
|
||||
foreach (var instanceInfo in message.InitialInstances) {
|
||||
if (await agent.InstanceSessionManager.Configure(instanceInfo) != ConfigureInstanceResult.Success) {
|
||||
var result = await agent.InstanceSessionManager.Configure(instanceInfo);
|
||||
if (!result.Is(ConfigureInstanceResult.Success)) {
|
||||
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", instanceInfo.InstanceName, instanceInfo.InstanceGuid);
|
||||
|
||||
shutdownTokenSource.Cancel();
|
||||
return;
|
||||
return NoReply.Instance;
|
||||
}
|
||||
}
|
||||
|
||||
await ServerMessaging.SendMessage(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
|
||||
await ServerMessaging.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
|
||||
return NoReply.Instance;
|
||||
}
|
||||
|
||||
public Task HandleRegisterAgentFailureResult(RegisterAgentFailureMessage message) {
|
||||
public Task<NoReply> HandleRegisterAgentFailure(RegisterAgentFailureMessage message) {
|
||||
string errorMessage = message.FailureKind switch {
|
||||
RegisterAgentFailure.ConnectionAlreadyHasAnAgent => "This connection already has an associated agent.",
|
||||
RegisterAgentFailure.InvalidToken => "Invalid token.",
|
||||
@ -47,22 +49,27 @@ public sealed class MessageListener : IMessageToAgentListener {
|
||||
Logger.Fatal("Agent authentication failed: {Error}", errorMessage);
|
||||
Environment.Exit(1);
|
||||
|
||||
return Task.CompletedTask;
|
||||
return Task.FromResult(NoReply.Instance);
|
||||
}
|
||||
|
||||
public async Task HandleConfigureInstance(ConfigureInstanceMessage message) {
|
||||
await socket.SendSimpleReply(message, await agent.InstanceSessionManager.Configure(message.Configuration));
|
||||
public async Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message) {
|
||||
return await agent.InstanceSessionManager.Configure(message.Configuration);
|
||||
}
|
||||
|
||||
public async Task HandleLaunchInstance(LaunchInstanceMessage message) {
|
||||
await socket.SendSimpleReply(message, await agent.InstanceSessionManager.Launch(message.InstanceGuid));
|
||||
public async Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
|
||||
return await agent.InstanceSessionManager.Launch(message.InstanceGuid);
|
||||
}
|
||||
|
||||
public async Task HandleStopInstance(StopInstanceMessage message) {
|
||||
await socket.SendSimpleReply(message, await agent.InstanceSessionManager.Stop(message.InstanceGuid, message.StopStrategy));
|
||||
public async Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
|
||||
return await agent.InstanceSessionManager.Stop(message.InstanceGuid, message.StopStrategy);
|
||||
}
|
||||
|
||||
public async Task HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
|
||||
await socket.SendSimpleReply(message, await agent.InstanceSessionManager.SendCommand(message.InstanceGuid, message.Command));
|
||||
public async Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
|
||||
return await agent.InstanceSessionManager.SendCommand(message.InstanceGuid, message.Command);
|
||||
}
|
||||
|
||||
public Task<NoReply> HandleReply(ReplyMessage message) {
|
||||
connection.Receive(message);
|
||||
return Task.FromResult(NoReply.Instance);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,4 @@
|
||||
using System.Reflection;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Agent;
|
||||
using Phantom.Agent.Rpc;
|
||||
using Phantom.Agent.Services;
|
||||
@ -45,8 +44,8 @@ try {
|
||||
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
|
||||
var agentServices = new AgentServices(agentInfo, folders);
|
||||
|
||||
MessageListener MessageListenerFactory(ClientSocket socket) {
|
||||
return new MessageListener(socket, agentServices, shutdownCancellationTokenSource);
|
||||
MessageListener MessageListenerFactory(RpcServerConnection connection) {
|
||||
return new MessageListener(connection, agentServices, shutdownCancellationTokenSource);
|
||||
}
|
||||
|
||||
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
public enum ConfigureInstanceResult {
|
||||
Success,
|
||||
AgentShuttingDown,
|
||||
InstanceLimitExceeded,
|
||||
MemoryLimitExceeded
|
||||
}
|
||||
|
@ -0,0 +1,8 @@
|
||||
namespace Phantom.Common.Data.Replies;
|
||||
|
||||
public enum InstanceActionGeneralResult : byte {
|
||||
None,
|
||||
AgentShuttingDown,
|
||||
AgentIsNotResponding,
|
||||
InstanceDoesNotExist
|
||||
}
|
41
Common/Phantom.Common.Data/Replies/InstanceActionResult.cs
Normal file
41
Common/Phantom.Common.Data/Replies/InstanceActionResult.cs
Normal file
@ -0,0 +1,41 @@
|
||||
using MemoryPack;
|
||||
|
||||
namespace Phantom.Common.Data.Replies;
|
||||
|
||||
[MemoryPackable]
|
||||
public sealed partial record InstanceActionResult<T>(
|
||||
[property: MemoryPackOrder(0)] InstanceActionGeneralResult GeneralResult,
|
||||
[property: MemoryPackOrder(1)] T? ConcreteResult
|
||||
) {
|
||||
public bool Is(T? concreteResult) {
|
||||
return GeneralResult == InstanceActionGeneralResult.None && EqualityComparer<T>.Default.Equals(ConcreteResult, concreteResult);
|
||||
}
|
||||
|
||||
public InstanceActionResult<T2> Map<T2>(Func<T, T2> mapper) {
|
||||
return new InstanceActionResult<T2>(GeneralResult, ConcreteResult is not null ? mapper(ConcreteResult) : default);
|
||||
}
|
||||
|
||||
public string ToSentence(Func<T, string> concreteResultToSentence) {
|
||||
return GeneralResult switch {
|
||||
InstanceActionGeneralResult.None => concreteResultToSentence(ConcreteResult!),
|
||||
InstanceActionGeneralResult.AgentShuttingDown => "Agent is shutting down.",
|
||||
InstanceActionGeneralResult.AgentIsNotResponding => "Agent is not responding.",
|
||||
InstanceActionGeneralResult.InstanceDoesNotExist => "Instance does not exist.",
|
||||
_ => "Unknown result."
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public static class InstanceActionResult {
|
||||
public static InstanceActionResult<T> General<T>(InstanceActionGeneralResult generalResult) {
|
||||
return new InstanceActionResult<T>(generalResult, default);
|
||||
}
|
||||
|
||||
public static InstanceActionResult<T> Concrete<T>(T? concreteResult) {
|
||||
return new InstanceActionResult<T>(InstanceActionGeneralResult.None, concreteResult);
|
||||
}
|
||||
|
||||
public static InstanceActionResult<T> DidNotReplyIfNull<T>(this InstanceActionResult<T>? result) {
|
||||
return result ?? General<T>(InstanceActionGeneralResult.AgentIsNotResponding);
|
||||
}
|
||||
}
|
@ -2,12 +2,9 @@
|
||||
|
||||
public enum LaunchInstanceResult {
|
||||
LaunchInitiated,
|
||||
AgentShuttingDown,
|
||||
InstanceDoesNotExist,
|
||||
InstanceAlreadyLaunching,
|
||||
InstanceAlreadyRunning,
|
||||
InstanceIsStopping,
|
||||
CommunicationError,
|
||||
UnknownError
|
||||
}
|
||||
|
||||
@ -15,12 +12,9 @@ public static class LaunchInstanceResultExtensions {
|
||||
public static string ToSentence(this LaunchInstanceResult reason) {
|
||||
return reason switch {
|
||||
LaunchInstanceResult.LaunchInitiated => "Launch initiated.",
|
||||
LaunchInstanceResult.AgentShuttingDown => "Agent is shutting down.",
|
||||
LaunchInstanceResult.InstanceDoesNotExist => "Instance does not exist.",
|
||||
LaunchInstanceResult.InstanceAlreadyLaunching => "Instance is already launching.",
|
||||
LaunchInstanceResult.InstanceAlreadyRunning => "Instance is already running.",
|
||||
LaunchInstanceResult.InstanceIsStopping => "Instance is stopping.",
|
||||
LaunchInstanceResult.CommunicationError => "Communication error.",
|
||||
_ => "Unknown error."
|
||||
};
|
||||
}
|
||||
|
@ -2,20 +2,14 @@
|
||||
|
||||
public enum SendCommandToInstanceResult {
|
||||
Success,
|
||||
InstanceDoesNotExist,
|
||||
AgentShuttingDown,
|
||||
AgentCommunicationError,
|
||||
UnknownError
|
||||
}
|
||||
|
||||
public static class SendCommandToInstanceResultExtensions {
|
||||
public static string ToSentence(this SendCommandToInstanceResult reason) {
|
||||
return reason switch {
|
||||
SendCommandToInstanceResult.Success => "Command sent.",
|
||||
SendCommandToInstanceResult.InstanceDoesNotExist => "Instance does not exist.",
|
||||
SendCommandToInstanceResult.AgentShuttingDown => "Agent is shutting down.",
|
||||
SendCommandToInstanceResult.AgentCommunicationError => "Agent did not reply in time.",
|
||||
_ => "Unknown error."
|
||||
SendCommandToInstanceResult.Success => "Command sent.",
|
||||
_ => "Unknown error."
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -2,11 +2,8 @@
|
||||
|
||||
public enum StopInstanceResult {
|
||||
StopInitiated,
|
||||
AgentShuttingDown,
|
||||
InstanceDoesNotExist,
|
||||
InstanceAlreadyStopping,
|
||||
InstanceAlreadyStopped,
|
||||
CommunicationError,
|
||||
UnknownError
|
||||
}
|
||||
|
||||
@ -14,11 +11,8 @@ public static class StopInstanceResultExtensions {
|
||||
public static string ToSentence(this StopInstanceResult reason) {
|
||||
return reason switch {
|
||||
StopInstanceResult.StopInitiated => "Stopping initiated.",
|
||||
StopInstanceResult.AgentShuttingDown => "Agent is shutting down.",
|
||||
StopInstanceResult.InstanceDoesNotExist => "Instance does not exist.",
|
||||
StopInstanceResult.InstanceAlreadyStopping => "Instance is already stopping.",
|
||||
StopInstanceResult.InstanceAlreadyStopped => "Instance is already stopped.",
|
||||
StopInstanceResult.CommunicationError => "Communication error.",
|
||||
_ => "Unknown error."
|
||||
};
|
||||
}
|
||||
|
18
Common/Phantom.Common.Messages/BiDirectional/ReplyMessage.cs
Normal file
18
Common/Phantom.Common.Messages/BiDirectional/ReplyMessage.cs
Normal file
@ -0,0 +1,18 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.ToServer;
|
||||
|
||||
[MemoryPackable]
|
||||
public sealed partial record ReplyMessage(
|
||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
||||
[property: MemoryPackOrder(1)] byte[] SerializedReply
|
||||
) : IMessageToServer, IMessageToAgent {
|
||||
public Task<NoReply> Accept(IMessageToServerListener listener) {
|
||||
return listener.HandleReply(this);
|
||||
}
|
||||
|
||||
public Task<NoReply> Accept(IMessageToAgentListener listener) {
|
||||
return listener.HandleReply(this);
|
||||
}
|
||||
}
|
@ -2,4 +2,8 @@
|
||||
|
||||
namespace Phantom.Common.Messages;
|
||||
|
||||
public interface IMessageToAgent : IMessage<IMessageToAgentListener> {}
|
||||
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {}
|
||||
|
||||
public interface IMessageToAgent : IMessageToAgent<NoReply> {
|
||||
uint IMessage<IMessageToAgentListener, NoReply>.SequenceId => 0;
|
||||
}
|
||||
|
@ -1,12 +1,16 @@
|
||||
using Phantom.Common.Messages.ToAgent;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Messages.ToAgent;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages;
|
||||
|
||||
public interface IMessageToAgentListener {
|
||||
Task HandleRegisterAgentSuccessResult(RegisterAgentSuccessMessage message);
|
||||
Task HandleRegisterAgentFailureResult(RegisterAgentFailureMessage message);
|
||||
Task HandleConfigureInstance(ConfigureInstanceMessage message);
|
||||
Task HandleLaunchInstance(LaunchInstanceMessage message);
|
||||
Task HandleStopInstance(StopInstanceMessage message);
|
||||
Task HandleSendCommandToInstance(SendCommandToInstanceMessage message);
|
||||
Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message);
|
||||
Task<NoReply> HandleRegisterAgentFailure(RegisterAgentFailureMessage message);
|
||||
Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message);
|
||||
Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message);
|
||||
Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message);
|
||||
Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message);
|
||||
Task<NoReply> HandleReply(ReplyMessage message);
|
||||
}
|
||||
|
@ -2,4 +2,8 @@
|
||||
|
||||
namespace Phantom.Common.Messages;
|
||||
|
||||
public interface IMessageToServer : IMessage<IMessageToServerListener> {}
|
||||
public interface IMessageToServer<TReply> : IMessage<IMessageToServerListener, TReply> {}
|
||||
|
||||
public interface IMessageToServer : IMessageToServer<NoReply> {
|
||||
uint IMessage<IMessageToServerListener, NoReply>.SequenceId => 0;
|
||||
}
|
||||
|
@ -1,14 +1,15 @@
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages;
|
||||
|
||||
public interface IMessageToServerListener {
|
||||
bool IsDisposed { get; }
|
||||
Task HandleRegisterAgent(RegisterAgentMessage message);
|
||||
Task HandleUnregisterAgent(UnregisterAgentMessage message);
|
||||
Task HandleAgentIsAlive(AgentIsAliveMessage message);
|
||||
Task HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message);
|
||||
Task HandleReportInstanceStatus(ReportInstanceStatusMessage message);
|
||||
Task HandleInstanceOutput(InstanceOutputMessage message);
|
||||
Task HandleSimpleReply(SimpleReplyMessage message);
|
||||
Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message);
|
||||
Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message);
|
||||
Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message);
|
||||
Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message);
|
||||
Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message);
|
||||
Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message);
|
||||
Task<NoReply> HandleReply(ReplyMessage message);
|
||||
}
|
||||
|
@ -1,5 +0,0 @@
|
||||
namespace Phantom.Common.Messages;
|
||||
|
||||
public interface IMessageWithReply {
|
||||
public uint SequenceId { get; }
|
||||
}
|
@ -1,4 +1,5 @@
|
||||
using Phantom.Common.Logging;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Logging;
|
||||
using Phantom.Common.Messages.ToAgent;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
@ -6,23 +7,24 @@ using Phantom.Utils.Rpc.Message;
|
||||
namespace Phantom.Common.Messages;
|
||||
|
||||
public static class MessageRegistries {
|
||||
public static MessageRegistry<IMessageToAgentListener, IMessageToAgent> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry:ToAgent"));
|
||||
public static MessageRegistry<IMessageToServerListener, IMessageToServer> ToServer { get; } = new (PhantomLogger.Create("MessageRegistry:ToServer"));
|
||||
public static MessageRegistry<IMessageToAgentListener> ToAgent { get; } = new (PhantomLogger.Create("MessageRegistry:ToAgent"));
|
||||
public static MessageRegistry<IMessageToServerListener> ToServer { get; } = new (PhantomLogger.Create("MessageRegistry:ToServer"));
|
||||
|
||||
static MessageRegistries() {
|
||||
ToAgent.Add<RegisterAgentSuccessMessage>(0);
|
||||
ToAgent.Add<RegisterAgentFailureMessage>(1);
|
||||
ToAgent.Add<ConfigureInstanceMessage>(2);
|
||||
ToAgent.Add<LaunchInstanceMessage>(3);
|
||||
ToAgent.Add<StopInstanceMessage>(4);
|
||||
ToAgent.Add<SendCommandToInstanceMessage>(5);
|
||||
ToAgent.Add<RegisterAgentSuccessMessage, NoReply>(0);
|
||||
ToAgent.Add<RegisterAgentFailureMessage, NoReply>(1);
|
||||
ToAgent.Add<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(2);
|
||||
ToAgent.Add<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(3);
|
||||
ToAgent.Add<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(4);
|
||||
ToAgent.Add<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(5);
|
||||
ToAgent.Add<ReplyMessage, NoReply>(127);
|
||||
|
||||
ToServer.Add<RegisterAgentMessage>(0);
|
||||
ToServer.Add<UnregisterAgentMessage>(1);
|
||||
ToServer.Add<AgentIsAliveMessage>(2);
|
||||
ToServer.Add<AdvertiseJavaRuntimesMessage>(3);
|
||||
ToServer.Add<ReportInstanceStatusMessage>(4);
|
||||
ToServer.Add<InstanceOutputMessage>(5);
|
||||
ToServer.Add<SimpleReplyMessage>(127);
|
||||
ToServer.Add<RegisterAgentMessage, NoReply>(0);
|
||||
ToServer.Add<UnregisterAgentMessage, NoReply>(1);
|
||||
ToServer.Add<AgentIsAliveMessage, NoReply>(2);
|
||||
ToServer.Add<AdvertiseJavaRuntimesMessage, NoReply>(3);
|
||||
ToServer.Add<ReportInstanceStatusMessage, NoReply>(4);
|
||||
ToServer.Add<InstanceOutputMessage, NoReply>(5);
|
||||
ToServer.Add<ReplyMessage, NoReply>(127);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data.Instance;
|
||||
using Phantom.Common.Data.Replies;
|
||||
|
||||
namespace Phantom.Common.Messages.ToAgent;
|
||||
|
||||
@ -7,8 +8,8 @@ namespace Phantom.Common.Messages.ToAgent;
|
||||
public sealed partial record ConfigureInstanceMessage(
|
||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
||||
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration
|
||||
) : IMessageToAgent, IMessageWithReply {
|
||||
public Task Accept(IMessageToAgentListener listener) {
|
||||
) : IMessageToAgent<InstanceActionResult<ConfigureInstanceResult>> {
|
||||
public Task<InstanceActionResult<ConfigureInstanceResult>> Accept(IMessageToAgentListener listener) {
|
||||
return listener.HandleConfigureInstance(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data.Replies;
|
||||
|
||||
namespace Phantom.Common.Messages.ToAgent;
|
||||
|
||||
@ -6,8 +7,8 @@ namespace Phantom.Common.Messages.ToAgent;
|
||||
public sealed partial record LaunchInstanceMessage(
|
||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
||||
[property: MemoryPackOrder(1)] Guid InstanceGuid
|
||||
) : IMessageToAgent, IMessageWithReply {
|
||||
public Task Accept(IMessageToAgentListener listener) {
|
||||
) : IMessageToAgent<InstanceActionResult<LaunchInstanceResult>> {
|
||||
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToAgentListener listener) {
|
||||
return listener.HandleLaunchInstance(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.ToAgent;
|
||||
|
||||
@ -7,7 +8,7 @@ namespace Phantom.Common.Messages.ToAgent;
|
||||
public sealed partial record RegisterAgentFailureMessage(
|
||||
[property: MemoryPackOrder(0)] RegisterAgentFailure FailureKind
|
||||
) : IMessageToAgent {
|
||||
public Task Accept(IMessageToAgentListener listener) {
|
||||
return listener.HandleRegisterAgentFailureResult(this);
|
||||
public Task<NoReply> Accept(IMessageToAgentListener listener) {
|
||||
return listener.HandleRegisterAgentFailure(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
using System.Collections.Immutable;
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data.Instance;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.ToAgent;
|
||||
|
||||
@ -8,7 +9,7 @@ namespace Phantom.Common.Messages.ToAgent;
|
||||
public sealed partial record RegisterAgentSuccessMessage(
|
||||
[property: MemoryPackOrder(0)] ImmutableArray<InstanceConfiguration> InitialInstances
|
||||
) : IMessageToAgent {
|
||||
public Task Accept(IMessageToAgentListener listener) {
|
||||
return listener.HandleRegisterAgentSuccessResult(this);
|
||||
public Task<NoReply> Accept(IMessageToAgentListener listener) {
|
||||
return listener.HandleRegisterAgentSuccess(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data.Replies;
|
||||
|
||||
namespace Phantom.Common.Messages.ToAgent;
|
||||
|
||||
@ -7,8 +8,8 @@ public sealed partial record SendCommandToInstanceMessage(
|
||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
||||
[property: MemoryPackOrder(1)] Guid InstanceGuid,
|
||||
[property: MemoryPackOrder(2)] string Command
|
||||
) : IMessageToAgent, IMessageWithReply {
|
||||
public Task Accept(IMessageToAgentListener listener) {
|
||||
) : IMessageToAgent<InstanceActionResult<SendCommandToInstanceResult>> {
|
||||
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToAgentListener listener) {
|
||||
return listener.HandleSendCommandToInstance(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data.Minecraft;
|
||||
using Phantom.Common.Data.Replies;
|
||||
|
||||
namespace Phantom.Common.Messages.ToAgent;
|
||||
|
||||
@ -8,8 +9,8 @@ public sealed partial record StopInstanceMessage(
|
||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
||||
[property: MemoryPackOrder(1)] Guid InstanceGuid,
|
||||
[property: MemoryPackOrder(2)] MinecraftStopStrategy StopStrategy
|
||||
) : IMessageToAgent, IMessageWithReply {
|
||||
public Task Accept(IMessageToAgentListener listener) {
|
||||
) : IMessageToAgent<InstanceActionResult<StopInstanceResult>> {
|
||||
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToAgentListener listener) {
|
||||
return listener.HandleStopInstance(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
using System.Collections.Immutable;
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data.Java;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.ToServer;
|
||||
|
||||
@ -8,7 +9,7 @@ namespace Phantom.Common.Messages.ToServer;
|
||||
public sealed partial record AdvertiseJavaRuntimesMessage(
|
||||
[property: MemoryPackOrder(0)] ImmutableArray<TaggedJavaRuntime> Runtimes
|
||||
) : IMessageToServer {
|
||||
public Task Accept(IMessageToServerListener listener) {
|
||||
public Task<NoReply> Accept(IMessageToServerListener listener) {
|
||||
return listener.HandleAdvertiseJavaRuntimes(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,11 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.ToServer;
|
||||
|
||||
[MemoryPackable]
|
||||
public sealed partial record AgentIsAliveMessage : IMessageToServer {
|
||||
public Task Accept(IMessageToServerListener listener) {
|
||||
public Task<NoReply> Accept(IMessageToServerListener listener) {
|
||||
return listener.HandleAgentIsAlive(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
using System.Collections.Immutable;
|
||||
using MemoryPack;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.ToServer;
|
||||
|
||||
@ -8,7 +9,7 @@ public sealed partial record InstanceOutputMessage(
|
||||
[property: MemoryPackOrder(0)] Guid InstanceGuid,
|
||||
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines
|
||||
) : IMessageToServer {
|
||||
public Task Accept(IMessageToServerListener listener) {
|
||||
public Task<NoReply> Accept(IMessageToServerListener listener) {
|
||||
return listener.HandleInstanceOutput(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data.Agent;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.ToServer;
|
||||
|
||||
@ -8,7 +9,7 @@ public sealed partial record RegisterAgentMessage(
|
||||
[property: MemoryPackOrder(0)] AgentAuthToken AuthToken,
|
||||
[property: MemoryPackOrder(1)] AgentInfo AgentInfo
|
||||
) : IMessageToServer {
|
||||
public Task Accept(IMessageToServerListener listener) {
|
||||
public Task<NoReply> Accept(IMessageToServerListener listener) {
|
||||
return listener.HandleRegisterAgent(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Common.Data.Instance;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.ToServer;
|
||||
|
||||
@ -8,7 +9,7 @@ public sealed partial record ReportInstanceStatusMessage(
|
||||
[property: MemoryPackOrder(0)] Guid InstanceGuid,
|
||||
[property: MemoryPackOrder(1)] IInstanceStatus InstanceStatus
|
||||
) : IMessageToServer {
|
||||
public Task Accept(IMessageToServerListener listener) {
|
||||
public Task<NoReply> Accept(IMessageToServerListener listener) {
|
||||
return listener.HandleReportInstanceStatus(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,22 +0,0 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using MemoryPack;
|
||||
|
||||
namespace Phantom.Common.Messages.ToServer;
|
||||
|
||||
[MemoryPackable]
|
||||
public sealed partial record SimpleReplyMessage(
|
||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
||||
[property: MemoryPackOrder(1)] int EnumValue
|
||||
) : IMessageToServer {
|
||||
public static SimpleReplyMessage FromEnum<TEnum>(uint sequenceId, TEnum enumValue) where TEnum : Enum {
|
||||
if (Unsafe.SizeOf<TEnum>() != Unsafe.SizeOf<int>()) {
|
||||
throw new ArgumentException("Enum type " + typeof(TEnum).Name + " is not compatible with int.", nameof(TEnum));
|
||||
}
|
||||
|
||||
return new SimpleReplyMessage(sequenceId, Unsafe.As<TEnum, int>(ref enumValue));
|
||||
}
|
||||
|
||||
public Task Accept(IMessageToServerListener listener) {
|
||||
return listener.HandleSimpleReply(this);
|
||||
}
|
||||
}
|
@ -1,4 +1,5 @@
|
||||
using MemoryPack;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Common.Messages.ToServer;
|
||||
|
||||
@ -6,7 +7,7 @@ namespace Phantom.Common.Messages.ToServer;
|
||||
public sealed partial record UnregisterAgentMessage(
|
||||
[property: MemoryPackOrder(0)] Guid AgentGuid
|
||||
) : IMessageToServer {
|
||||
public Task Accept(IMessageToServerListener listener) {
|
||||
public Task<NoReply> Accept(IMessageToServerListener listener) {
|
||||
return listener.HandleUnregisterAgent(this);
|
||||
}
|
||||
}
|
||||
|
@ -1,11 +1,11 @@
|
||||
<Project>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="7.0.0-rc.1.22427.2" />
|
||||
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="7.0.0-rc.1.22427.2" />
|
||||
<PackageReference Update="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="7.0.0-rc.1.22427.2" />
|
||||
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="7.0.0-rc.1.22426.7" />
|
||||
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.0-rc.1" />
|
||||
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="7.0.1" />
|
||||
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="7.0.1" />
|
||||
<PackageReference Update="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="7.0.1" />
|
||||
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="7.0.1" />
|
||||
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.1" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
@ -13,22 +13,22 @@
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Update="MemoryPack" Version="1.4.1" />
|
||||
<PackageReference Update="MemoryPack" Version="1.9.7" />
|
||||
<PackageReference Update="NetMQ" Version="4.0.1.10" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Update="Serilog" Version="2.12.0" />
|
||||
<PackageReference Update="Serilog.AspNetCore" Version="6.0.1" />
|
||||
<PackageReference Update="Serilog.AspNetCore" Version="6.1.0" />
|
||||
<PackageReference Update="Serilog.Sinks.Console" Version="4.1.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Update="coverlet.collector" Version="3.1.2" />
|
||||
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="17.3.2" />
|
||||
<PackageReference Update="coverlet.collector" Version="3.2.0" />
|
||||
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="17.4.1" />
|
||||
<PackageReference Update="NUnit" Version="3.13.3" />
|
||||
<PackageReference Update="NUnit.Analyzers" Version="3.3.0" />
|
||||
<PackageReference Update="NUnit3TestAdapter" Version="4.2.1" />
|
||||
<PackageReference Update="NUnit.Analyzers" Version="3.5.0" />
|
||||
<PackageReference Update="NUnit3TestAdapter" Version="4.3.1" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
@ -1,6 +1,8 @@
|
||||
using NetMQ;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Common.Messages;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Server.Rpc;
|
||||
|
||||
@ -8,12 +10,15 @@ public sealed class RpcClientConnection {
|
||||
private readonly ServerSocket socket;
|
||||
private readonly uint routingId;
|
||||
|
||||
private readonly MessageReplyTracker messageReplyTracker;
|
||||
|
||||
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
|
||||
private bool isClosed;
|
||||
|
||||
internal RpcClientConnection(ServerSocket socket, uint routingId) {
|
||||
internal RpcClientConnection(ServerSocket socket, uint routingId, MessageReplyTracker messageReplyTracker) {
|
||||
this.socket = socket;
|
||||
this.routingId = routingId;
|
||||
this.messageReplyTracker = messageReplyTracker;
|
||||
}
|
||||
|
||||
public bool IsSame(RpcClientConnection other) {
|
||||
@ -29,14 +34,32 @@ public sealed class RpcClientConnection {
|
||||
}
|
||||
}
|
||||
|
||||
public async Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
|
||||
if (isClosed) {
|
||||
return; // TODO
|
||||
}
|
||||
private byte[] WriteBytes<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<TReply> {
|
||||
return isClosed ? Array.Empty<byte>() : MessageRegistries.ToAgent.Write<TMessage, TReply>(message).ToArray();
|
||||
}
|
||||
|
||||
byte[] bytes = MessageRegistries.ToAgent.Write(message).ToArray();
|
||||
public async Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
|
||||
var bytes = WriteBytes<TMessage, NoReply>(message);
|
||||
if (bytes.Length > 0) {
|
||||
await socket.SendAsync(routingId, bytes);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||
var sequenceId = messageReplyTracker.RegisterReply();
|
||||
var message = messageFactory(sequenceId);
|
||||
|
||||
var bytes = WriteBytes<TMessage, TReply>(message);
|
||||
if (bytes.Length == 0) {
|
||||
messageReplyTracker.ForgetReply(sequenceId);
|
||||
return null;
|
||||
}
|
||||
|
||||
await socket.SendAsync(routingId, bytes);
|
||||
return await messageReplyTracker.WaitForReply<TReply>(message.SequenceId, waitForReplyTime, cancellationToken);
|
||||
}
|
||||
|
||||
public void Receive(ReplyMessage message) {
|
||||
messageReplyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
using Phantom.Common.Messages;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Utils.Rpc;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Runtime;
|
||||
using Serilog.Events;
|
||||
using ILogger = Serilog.ILogger;
|
||||
@ -38,7 +39,7 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
|
||||
logger.Information("ZeroMQ server initialized, listening for agent connections on port {Port}.", config.Port);
|
||||
}
|
||||
|
||||
protected override void Run(ServerSocket socket, TaskManager taskManager) {
|
||||
protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
|
||||
var logger = config.Logger;
|
||||
var clients = new Dictionary<ulong, Client>();
|
||||
|
||||
@ -60,19 +61,17 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
|
||||
continue;
|
||||
}
|
||||
|
||||
var connection = new RpcClientConnection(socket, routingId);
|
||||
var connection = new RpcClientConnection(socket, routingId, replyTracker);
|
||||
connection.Closed += OnConnectionClosed;
|
||||
|
||||
client = new Client(connection, listenerFactory);
|
||||
client = new Client(connection, listenerFactory, logger, taskManager, cancellationToken);
|
||||
clients[routingId] = client;
|
||||
}
|
||||
|
||||
LogMessageType(logger, routingId, data);
|
||||
MessageRegistries.ToServer.Handle(data, client.Listener, taskManager, cancellationToken);
|
||||
MessageRegistries.ToServer.Handle(data, client);
|
||||
|
||||
if (client.Listener.IsDisposed) {
|
||||
client.Connection.Close();
|
||||
}
|
||||
client.CloseIfDisposed();
|
||||
}
|
||||
|
||||
foreach (var client in clients.Values) {
|
||||
@ -102,13 +101,21 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
|
||||
return false;
|
||||
}
|
||||
|
||||
private readonly struct Client {
|
||||
private sealed class Client : MessageHandler<IMessageToServerListener> {
|
||||
public RpcClientConnection Connection { get; }
|
||||
public IMessageToServerListener Listener { get; }
|
||||
|
||||
public Client(RpcClientConnection connection, Func<RpcClientConnection, IMessageToServerListener> listenerFactory) {
|
||||
public Client(RpcClientConnection connection, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listenerFactory(connection), logger, taskManager, cancellationToken) {
|
||||
Connection = connection;
|
||||
Listener = listenerFactory(connection);
|
||||
}
|
||||
|
||||
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
||||
return Connection.Send(new ReplyMessage(sequenceId, serializedReply));
|
||||
}
|
||||
|
||||
public void CloseIfDisposed() {
|
||||
if (Listener.IsDisposed) {
|
||||
Connection.Close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,11 @@ sealed class AgentConnection {
|
||||
connection.Close();
|
||||
}
|
||||
|
||||
public async Task SendMessage<TMessage>(TMessage message) where TMessage : IMessageToAgent {
|
||||
await connection.Send(message);
|
||||
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
|
||||
return connection.Send(message);
|
||||
}
|
||||
|
||||
public Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||
return connection.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ using Phantom.Common.Messages.ToAgent;
|
||||
using Phantom.Server.Database;
|
||||
using Phantom.Server.Rpc;
|
||||
using Phantom.Server.Services.Instances;
|
||||
using Phantom.Server.Services.Rpc;
|
||||
using Phantom.Utils.Collections;
|
||||
using Phantom.Utils.Events;
|
||||
using Phantom.Utils.Runtime;
|
||||
@ -114,28 +113,14 @@ public sealed class AgentManager {
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<bool> SendMessage<TMessage>(Guid guid, TMessage message) where TMessage : IMessageToAgent {
|
||||
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||
var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null;
|
||||
if (connection != null) {
|
||||
await connection.SendMessage(message);
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
if (connection == null) {
|
||||
// TODO handle missing agent?
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task<int?> SendMessageWithReply<TMessage>(Guid guid, Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent, IMessageWithReply {
|
||||
var sequenceId = MessageReplyTracker.Instance.RegisterReply();
|
||||
var message = messageFactory(sequenceId);
|
||||
|
||||
if (!await SendMessage(guid, message)) {
|
||||
MessageReplyTracker.Instance.ForgetReply(sequenceId);
|
||||
return null;
|
||||
}
|
||||
|
||||
return await MessageReplyTracker.Instance.WaitForReply(sequenceId, waitForReplyTime, cancellationToken);
|
||||
return await connection.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
|
||||
}
|
||||
|
||||
private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> {
|
||||
|
@ -6,10 +6,8 @@ public enum AddInstanceResult {
|
||||
InstanceNameMustNotBeEmpty,
|
||||
InstanceMemoryMustNotBeZero,
|
||||
AgentNotFound,
|
||||
AgentShuttingDown,
|
||||
AgentInstanceLimitExceeded,
|
||||
AgentMemoryLimitExceeded,
|
||||
AgentCommunicationError,
|
||||
UnknownError
|
||||
}
|
||||
|
||||
@ -20,10 +18,8 @@ public static class AddInstanceResultExtensions {
|
||||
AddInstanceResult.InstanceNameMustNotBeEmpty => "Instance name must not be empty.",
|
||||
AddInstanceResult.InstanceMemoryMustNotBeZero => "Memory must not be 0 MB.",
|
||||
AddInstanceResult.AgentNotFound => "Agent not found.",
|
||||
AddInstanceResult.AgentShuttingDown => "Agent is shutting down.",
|
||||
AddInstanceResult.AgentInstanceLimitExceeded => "Agent instance limit exceeded.",
|
||||
AddInstanceResult.AgentMemoryLimitExceeded => "Agent memory limit exceeded.",
|
||||
AddInstanceResult.AgentCommunicationError => "Agent did not reply in time.",
|
||||
_ => "Unknown error."
|
||||
};
|
||||
}
|
||||
|
@ -54,21 +54,21 @@ public sealed class InstanceManager {
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<AddInstanceResult> AddInstance(InstanceConfiguration configuration) {
|
||||
public async Task<InstanceActionResult<AddInstanceResult>> AddInstance(InstanceConfiguration configuration) {
|
||||
var agent = agentManager.GetAgent(configuration.AgentGuid);
|
||||
if (agent == null) {
|
||||
return AddInstanceResult.AgentNotFound;
|
||||
return InstanceActionResult.Concrete(AddInstanceResult.AgentNotFound);
|
||||
}
|
||||
|
||||
var instance = new Instance(configuration);
|
||||
if (!instances.ByGuid.TryAdd(instance.Configuration.InstanceGuid, instance)) {
|
||||
return AddInstanceResult.InstanceAlreadyExists;
|
||||
return InstanceActionResult.Concrete(AddInstanceResult.InstanceAlreadyExists);
|
||||
}
|
||||
|
||||
var agentName = agent.Name;
|
||||
|
||||
var reply = (ConfigureInstanceResult?) await agentManager.SendMessageWithReply(configuration.AgentGuid, sequenceId => new ConfigureInstanceMessage(sequenceId, configuration), TimeSpan.FromSeconds(10));
|
||||
if (reply == ConfigureInstanceResult.Success) {
|
||||
var reply = (await agentManager.SendMessage<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(configuration.AgentGuid, sequenceId => new ConfigureInstanceMessage(sequenceId, configuration), TimeSpan.FromSeconds(10))).DidNotReplyIfNull();
|
||||
if (reply.Is(ConfigureInstanceResult.Success)) {
|
||||
using (var scope = databaseProvider.CreateScope()) {
|
||||
InstanceEntity entity = scope.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
|
||||
|
||||
@ -87,20 +87,18 @@ public sealed class InstanceManager {
|
||||
}
|
||||
|
||||
Logger.Information("Added instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\".", configuration.InstanceName, configuration.InstanceGuid, agentName);
|
||||
return AddInstanceResult.Success;
|
||||
return InstanceActionResult.Concrete(AddInstanceResult.Success);
|
||||
}
|
||||
else {
|
||||
instances.ByGuid.Remove(configuration.InstanceGuid);
|
||||
|
||||
var result = reply switch {
|
||||
null => AddInstanceResult.AgentCommunicationError,
|
||||
ConfigureInstanceResult.AgentShuttingDown => AddInstanceResult.AgentShuttingDown,
|
||||
var result = reply.Map(static result => result switch {
|
||||
ConfigureInstanceResult.InstanceLimitExceeded => AddInstanceResult.AgentInstanceLimitExceeded,
|
||||
ConfigureInstanceResult.MemoryLimitExceeded => AddInstanceResult.AgentMemoryLimitExceeded,
|
||||
_ => AddInstanceResult.UnknownError
|
||||
};
|
||||
});
|
||||
|
||||
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", configuration.InstanceName, configuration.InstanceGuid, agentName, result.ToSentence());
|
||||
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", configuration.InstanceName, configuration.InstanceGuid, agentName, result.ToSentence(AddInstanceResultExtensions.ToSentence));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@ -121,28 +119,28 @@ public sealed class InstanceManager {
|
||||
instances.ByGuid.ReplaceAllIf(instance => instance with { Status = instanceStatus }, instance => instance.Configuration.AgentGuid == agentGuid);
|
||||
}
|
||||
|
||||
public async Task<LaunchInstanceResult> LaunchInstance(Guid instanceGuid) {
|
||||
public async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid instanceGuid) {
|
||||
var instance = GetInstance(instanceGuid);
|
||||
if (instance == null) {
|
||||
return LaunchInstanceResult.InstanceDoesNotExist;
|
||||
return InstanceActionResult.General<LaunchInstanceResult>(InstanceActionGeneralResult.InstanceDoesNotExist);
|
||||
}
|
||||
|
||||
await SetInstanceShouldLaunchAutomatically(instanceGuid, true);
|
||||
|
||||
var reply = (LaunchInstanceResult?) await agentManager.SendMessageWithReply(instance.Configuration.AgentGuid, sequenceId => new LaunchInstanceMessage(sequenceId, instanceGuid), TimeSpan.FromSeconds(10));
|
||||
return reply ?? LaunchInstanceResult.CommunicationError;
|
||||
var reply = await agentManager.SendMessage<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new LaunchInstanceMessage(sequenceId, instanceGuid), TimeSpan.FromSeconds(10));
|
||||
return reply.DidNotReplyIfNull();
|
||||
}
|
||||
|
||||
public async Task<StopInstanceResult> StopInstance(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
|
||||
public async Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
|
||||
var instance = GetInstance(instanceGuid);
|
||||
if (instance == null) {
|
||||
return StopInstanceResult.InstanceDoesNotExist;
|
||||
return InstanceActionResult.General<StopInstanceResult>(InstanceActionGeneralResult.InstanceDoesNotExist);
|
||||
}
|
||||
|
||||
await SetInstanceShouldLaunchAutomatically(instanceGuid, false);
|
||||
|
||||
var reply = (StopInstanceResult?) await agentManager.SendMessageWithReply(instance.Configuration.AgentGuid, sequenceId => new StopInstanceMessage(sequenceId, instanceGuid, stopStrategy), TimeSpan.FromSeconds(10));
|
||||
return reply ?? StopInstanceResult.CommunicationError;
|
||||
var reply = await agentManager.SendMessage<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new StopInstanceMessage(sequenceId, instanceGuid, stopStrategy), TimeSpan.FromSeconds(10));
|
||||
return reply.DidNotReplyIfNull();
|
||||
}
|
||||
|
||||
private async Task SetInstanceShouldLaunchAutomatically(Guid instanceGuid, bool shouldLaunchAutomatically) {
|
||||
@ -158,14 +156,14 @@ public sealed class InstanceManager {
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<SendCommandToInstanceResult> SendCommand(Guid instanceGuid, string command) {
|
||||
public async Task<InstanceActionResult<SendCommandToInstanceResult>> SendCommand(Guid instanceGuid, string command) {
|
||||
var instance = GetInstance(instanceGuid);
|
||||
if (instance != null) {
|
||||
var reply = (SendCommandToInstanceResult?) await agentManager.SendMessageWithReply(instance.Configuration.AgentGuid, sequenceId => new SendCommandToInstanceMessage(sequenceId, instanceGuid, command), TimeSpan.FromSeconds(10));
|
||||
return reply ?? SendCommandToInstanceResult.AgentCommunicationError;
|
||||
if (instance == null) {
|
||||
return InstanceActionResult.General<SendCommandToInstanceResult>(InstanceActionGeneralResult.InstanceDoesNotExist);
|
||||
}
|
||||
|
||||
return SendCommandToInstanceResult.InstanceDoesNotExist;
|
||||
var reply = await agentManager.SendMessage<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new SendCommandToInstanceMessage(sequenceId, instanceGuid, command), TimeSpan.FromSeconds(10));
|
||||
return reply.DidNotReplyIfNull();
|
||||
}
|
||||
|
||||
internal ImmutableArray<InstanceConfiguration> GetInstanceConfigurationsForAgent(Guid agentGuid) {
|
||||
|
@ -1,58 +0,0 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Phantom.Common.Logging;
|
||||
using Phantom.Common.Messages.ToServer;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Server.Services.Rpc;
|
||||
|
||||
sealed class MessageReplyTracker {
|
||||
private static readonly ILogger Logger = PhantomLogger.Create<MessageReplyTracker>();
|
||||
|
||||
public static MessageReplyTracker Instance { get; } = new ();
|
||||
|
||||
private uint lastSequenceId;
|
||||
private readonly ConcurrentDictionary<uint, TaskCompletionSource<int?>> simpleReplyTasks = new (4, 16);
|
||||
|
||||
private MessageReplyTracker() {}
|
||||
|
||||
public uint RegisterReply() {
|
||||
var sequenceId = Interlocked.Increment(ref lastSequenceId);
|
||||
simpleReplyTasks[sequenceId] = new TaskCompletionSource<int?>(TaskCreationOptions.None);
|
||||
return sequenceId;
|
||||
}
|
||||
|
||||
public async Task<int?> WaitForReply(uint sequenceId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) {
|
||||
if (!simpleReplyTasks.TryGetValue(sequenceId, out var completionSource)) {
|
||||
Logger.Warning("No reply callback for id {SequenceId}.", sequenceId);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return await completionSource.Task.WaitAsync(waitForReplyTime, cancellationToken);
|
||||
} catch (TimeoutException) {
|
||||
return null;
|
||||
} catch (OperationCanceledException) {
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
Logger.Warning(e, "Error processing reply with id {SequenceId}.", sequenceId);
|
||||
return null;
|
||||
} finally {
|
||||
ForgetReply(sequenceId);
|
||||
}
|
||||
}
|
||||
|
||||
public void ForgetReply(uint sequenceId) {
|
||||
if (simpleReplyTasks.TryRemove(sequenceId, out var task)) {
|
||||
task.SetCanceled();
|
||||
}
|
||||
}
|
||||
|
||||
public void ReceiveReply(SimpleReplyMessage message) {
|
||||
if (simpleReplyTasks.TryRemove(message.SequenceId, out var task)) {
|
||||
task.SetResult(message.EnumValue);
|
||||
}
|
||||
else {
|
||||
Logger.Warning("Received a reply with id {SequenceId} but no registered callback.", message.SequenceId);
|
||||
}
|
||||
}
|
||||
}
|
@ -6,6 +6,7 @@ using Phantom.Common.Messages.ToServer;
|
||||
using Phantom.Server.Rpc;
|
||||
using Phantom.Server.Services.Agents;
|
||||
using Phantom.Server.Services.Instances;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
|
||||
namespace Phantom.Server.Services.Rpc;
|
||||
|
||||
@ -31,7 +32,7 @@ public sealed class MessageToServerListener : IMessageToServerListener {
|
||||
this.instanceLogManager = instanceLogManager;
|
||||
}
|
||||
|
||||
public async Task HandleRegisterAgent(RegisterAgentMessage message) {
|
||||
public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) {
|
||||
if (agentGuid != null && agentGuid != message.AgentInfo.Guid) {
|
||||
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
|
||||
}
|
||||
@ -40,42 +41,46 @@ public sealed class MessageToServerListener : IMessageToServerListener {
|
||||
agentGuid = guid;
|
||||
agentGuidWaiter.SetResult(guid);
|
||||
}
|
||||
|
||||
return NoReply.Instance;
|
||||
}
|
||||
|
||||
private async Task<Guid> WaitForAgentGuid() {
|
||||
return await agentGuidWaiter.Task.WaitAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public Task HandleUnregisterAgent(UnregisterAgentMessage message) {
|
||||
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
|
||||
IsDisposed = true;
|
||||
|
||||
if (agentManager.UnregisterAgent(message.AgentGuid, connection)) {
|
||||
instanceManager.SetInstanceStatesForAgent(message.AgentGuid, InstanceStatus.Offline);
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
return Task.FromResult(NoReply.Instance);
|
||||
}
|
||||
|
||||
public async Task HandleAgentIsAlive(AgentIsAliveMessage message) {
|
||||
public async Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message) {
|
||||
agentManager.NotifyAgentIsAlive(await WaitForAgentGuid());
|
||||
return NoReply.Instance;
|
||||
}
|
||||
|
||||
public async Task HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
|
||||
public async Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
|
||||
agentJavaRuntimesManager.Update(await WaitForAgentGuid(), message.Runtimes);
|
||||
return NoReply.Instance;
|
||||
}
|
||||
|
||||
public Task HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
|
||||
public Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
|
||||
instanceManager.SetInstanceState(message.InstanceGuid, message.InstanceStatus);
|
||||
return Task.CompletedTask;
|
||||
return Task.FromResult(NoReply.Instance);
|
||||
}
|
||||
|
||||
public Task HandleInstanceOutput(InstanceOutputMessage message) {
|
||||
public Task<NoReply> HandleInstanceOutput(InstanceOutputMessage message) {
|
||||
instanceLogManager.AddLines(message.InstanceGuid, message.Lines);
|
||||
return Task.CompletedTask;
|
||||
return Task.FromResult(NoReply.Instance);
|
||||
}
|
||||
|
||||
public Task HandleSimpleReply(SimpleReplyMessage message) {
|
||||
MessageReplyTracker.Instance.ReceiveReply(message);
|
||||
return Task.CompletedTask;
|
||||
public Task<NoReply> HandleReply(ReplyMessage message) {
|
||||
connection.Receive(message);
|
||||
return Task.FromResult(NoReply.Instance);
|
||||
}
|
||||
}
|
||||
|
@ -294,12 +294,12 @@
|
||||
|
||||
var instance = new InstanceConfiguration(selectedAgent.Agent.Guid, instanceGuid, form.InstanceName, serverPort, rconPort, form.MinecraftVersion, form.MinecraftServerKind, memoryAllocation, javaRuntimeGuid, jvmArguments, LaunchAutomatically: false);
|
||||
var result = await InstanceManager.AddInstance(instance);
|
||||
if (result == AddInstanceResult.Success) {
|
||||
if (result.Is(AddInstanceResult.Success)) {
|
||||
await AuditLog.AddInstanceCreatedEvent(instance.InstanceGuid);
|
||||
Nav.NavigateTo("instances/" + instance.InstanceGuid);
|
||||
}
|
||||
else {
|
||||
form.SubmitModel.StopSubmitting(result.ToSentence());
|
||||
form.SubmitModel.StopSubmitting(result.ToSentence(AddInstanceResultExtensions.ToSentence));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -71,11 +71,11 @@ else {
|
||||
}
|
||||
|
||||
var result = await InstanceManager.LaunchInstance(InstanceGuid);
|
||||
if (result == LaunchInstanceResult.LaunchInitiated) {
|
||||
if (result.Is(LaunchInstanceResult.LaunchInitiated)) {
|
||||
await AuditLog.AddInstanceLaunchedEvent(InstanceGuid);
|
||||
}
|
||||
else {
|
||||
lastError = result.ToSentence();
|
||||
lastError = result.ToSentence(LaunchInstanceResultExtensions.ToSentence);
|
||||
}
|
||||
} finally {
|
||||
isLaunchingInstance = false;
|
||||
|
@ -1,6 +1,6 @@
|
||||
@using Phantom.Common.Data.Replies
|
||||
@using Phantom.Server.Services.Instances
|
||||
@using Phantom.Server.Services.Audit
|
||||
@using Phantom.Server.Services.Instances
|
||||
@using Phantom.Common.Data.Replies
|
||||
@inherits PhantomComponent
|
||||
@inject InstanceManager InstanceManager
|
||||
@inject AuditLog AuditLog
|
||||
@ -40,13 +40,13 @@
|
||||
}
|
||||
|
||||
var result = await InstanceManager.SendCommand(InstanceGuid, form.Command);
|
||||
if (result == SendCommandToInstanceResult.Success) {
|
||||
if (result.Is(SendCommandToInstanceResult.Success)) {
|
||||
await AuditLog.AddInstanceCommandExecutedEvent(InstanceGuid, form.Command);
|
||||
form.Command = string.Empty;
|
||||
form.SubmitModel.StopSubmitting();
|
||||
}
|
||||
else {
|
||||
form.SubmitModel.StopSubmitting(result.ToSentence());
|
||||
form.SubmitModel.StopSubmitting(result.ToSentence(SendCommandToInstanceResultExtensions.ToSentence));
|
||||
}
|
||||
|
||||
await commandInputElement.FocusAsync(preventScroll: true);
|
||||
|
@ -1,8 +1,8 @@
|
||||
@using Phantom.Common.Data.Replies
|
||||
@using Phantom.Server.Services.Instances
|
||||
@using Phantom.Server.Services.Audit
|
||||
@using Phantom.Server.Services.Instances
|
||||
@using System.ComponentModel.DataAnnotations
|
||||
@using Phantom.Common.Data.Minecraft
|
||||
@using Phantom.Common.Data.Replies
|
||||
@inherits PhantomComponent
|
||||
@inject IJSRuntime Js;
|
||||
@inject InstanceManager InstanceManager;
|
||||
@ -57,13 +57,13 @@
|
||||
}
|
||||
|
||||
var result = await InstanceManager.StopInstance(InstanceGuid, new MinecraftStopStrategy(form.StopInSeconds));
|
||||
if (result == StopInstanceResult.StopInitiated) {
|
||||
if (result.Is(StopInstanceResult.StopInitiated)) {
|
||||
await AuditLog.AddInstanceStoppedEvent(InstanceGuid, form.StopInSeconds);
|
||||
await Js.InvokeVoidAsync("closeModal", ModalId);
|
||||
form.SubmitModel.StopSubmitting();
|
||||
}
|
||||
else {
|
||||
form.SubmitModel.StopSubmitting(result.ToSentence());
|
||||
form.SubmitModel.StopSubmitting(result.ToSentence(StopInstanceResultExtensions.ToSentence));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
public interface IMessage<TListener> {
|
||||
Task Accept(TListener listener);
|
||||
public interface IMessage<TListener, TReply> {
|
||||
public uint SequenceId { get; }
|
||||
Task<TReply> Accept(TListener listener);
|
||||
}
|
||||
|
40
Utils/Phantom.Utils.Rpc/Message/MessageHandler.cs
Normal file
40
Utils/Phantom.Utils.Rpc/Message/MessageHandler.cs
Normal file
@ -0,0 +1,40 @@
|
||||
using Phantom.Utils.Runtime;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
public abstract class MessageHandler<TListener> {
|
||||
protected TListener Listener { get; }
|
||||
|
||||
private readonly ILogger logger;
|
||||
private readonly TaskManager taskManager;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
protected MessageHandler(TListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) {
|
||||
this.Listener = listener;
|
||||
this.logger = logger;
|
||||
this.taskManager = taskManager;
|
||||
this.cancellationToken = cancellationToken;
|
||||
}
|
||||
|
||||
internal void Enqueue<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
taskManager.Run(async () => {
|
||||
try {
|
||||
await Handle<TMessage, TReply>(message);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async Task Handle<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
|
||||
TReply reply = await message.Accept(Listener);
|
||||
|
||||
if (reply is not NoReply) {
|
||||
await SendReply(message.SequenceId, MessageSerializer.Serialize(reply));
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Task SendReply(uint sequenceId, byte[] serializedReply);
|
||||
}
|
@ -1,27 +1,26 @@
|
||||
using System.Buffers;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using Phantom.Utils.Runtime;
|
||||
using Serilog;
|
||||
using Serilog.Events;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
public sealed class MessageRegistry<TListener, TMessageBase> where TMessageBase : class, IMessage<TListener> {
|
||||
public sealed class MessageRegistry<TListener> {
|
||||
private const int DefaultBufferSize = 512;
|
||||
|
||||
private readonly ILogger logger;
|
||||
private readonly Dictionary<Type, ushort> typeToCodeMapping = new ();
|
||||
private readonly Dictionary<ushort, Type> codeToTypeMapping = new ();
|
||||
private readonly Dictionary<ushort, Func<ReadOnlyMemory<byte>, TMessageBase>> codeToDeserializerMapping = new ();
|
||||
private readonly Dictionary<ushort, Action<ReadOnlyMemory<byte>, ushort, MessageHandler<TListener>>> codeToHandlerMapping = new ();
|
||||
|
||||
public MessageRegistry(ILogger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public void Add<TMessage>(ushort code) where TMessage : TMessageBase {
|
||||
public void Add<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
|
||||
typeToCodeMapping.Add(typeof(TMessage), code);
|
||||
codeToTypeMapping.Add(code, typeof(TMessage));
|
||||
codeToDeserializerMapping.Add(code, MessageSerializer.Deserialize<TMessage, TMessageBase, TListener>());
|
||||
codeToHandlerMapping.Add(code, HandleInternal<TMessage, TReply>);
|
||||
}
|
||||
|
||||
public bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
|
||||
@ -34,7 +33,11 @@ public sealed class MessageRegistry<TListener, TMessageBase> where TMessageBase
|
||||
}
|
||||
}
|
||||
|
||||
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : TMessageBase {
|
||||
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
|
||||
return Write<TMessage, NoReply>(message);
|
||||
}
|
||||
|
||||
public ReadOnlySpan<byte> Write<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
|
||||
if (!typeToCodeMapping.TryGetValue(typeof(TMessage), out ushort code)) {
|
||||
logger.Error("Unknown message type {Type}.", typeof(TMessage));
|
||||
return default;
|
||||
@ -44,7 +47,7 @@ public sealed class MessageRegistry<TListener, TMessageBase> where TMessageBase
|
||||
|
||||
try {
|
||||
MessageSerializer.WriteCode(buffer, code);
|
||||
MessageSerializer.Serialize<TMessage, TListener>(buffer, message);
|
||||
MessageSerializer.Serialize(buffer, message);
|
||||
|
||||
if (buffer.WrittenCount > DefaultBufferSize && logger.IsEnabled(LogEventLevel.Verbose)) {
|
||||
logger.Verbose("Serializing {Type} exceeded default buffer size: {WrittenSize} B > {DefaultBufferSize} B", typeof(TMessage).Name, buffer.WrittenCount, DefaultBufferSize);
|
||||
@ -57,7 +60,7 @@ public sealed class MessageRegistry<TListener, TMessageBase> where TMessageBase
|
||||
}
|
||||
}
|
||||
|
||||
public void Handle(ReadOnlyMemory<byte> data, TListener listener, TaskManager taskManager, CancellationToken cancellationToken) {
|
||||
public void Handle(ReadOnlyMemory<byte> data, MessageHandler<TListener> handler) {
|
||||
ushort code;
|
||||
try {
|
||||
code = MessageSerializer.ReadCode(ref data);
|
||||
@ -66,28 +69,23 @@ public sealed class MessageRegistry<TListener, TMessageBase> where TMessageBase
|
||||
return;
|
||||
}
|
||||
|
||||
if (!codeToDeserializerMapping.TryGetValue(code, out var deserialize)) {
|
||||
if (!codeToHandlerMapping.TryGetValue(code, out var handle)) {
|
||||
logger.Error("Unknown message code {Code}.", code);
|
||||
return;
|
||||
}
|
||||
|
||||
TMessageBase message;
|
||||
handle(data, code, handler);
|
||||
}
|
||||
|
||||
private void HandleInternal<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, TReply> {
|
||||
TMessage message;
|
||||
try {
|
||||
message = deserialize(data);
|
||||
message = MessageSerializer.Deserialize<TMessage>(data);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Failed to deserialize message with code {Code}.", code);
|
||||
return;
|
||||
}
|
||||
|
||||
async Task HandleMessage() {
|
||||
try {
|
||||
await message.Accept(listener);
|
||||
} catch (Exception e) {
|
||||
logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
|
||||
}
|
||||
}
|
||||
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
taskManager.Run(HandleMessage);
|
||||
handler.Enqueue<TMessage, TReply>(message);
|
||||
}
|
||||
}
|
||||
|
57
Utils/Phantom.Utils.Rpc/Message/MessageReplyTracker.cs
Normal file
57
Utils/Phantom.Utils.Rpc/Message/MessageReplyTracker.cs
Normal file
@ -0,0 +1,57 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
public sealed class MessageReplyTracker {
|
||||
private readonly ILogger logger;
|
||||
private readonly ConcurrentDictionary<uint, TaskCompletionSource<byte[]>> replyTasks = new (4, 16);
|
||||
|
||||
private uint lastSequenceId;
|
||||
|
||||
internal MessageReplyTracker(ILogger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public uint RegisterReply() {
|
||||
var sequenceId = Interlocked.Increment(ref lastSequenceId);
|
||||
replyTasks[sequenceId] = new TaskCompletionSource<byte[]>(TaskCreationOptions.None);
|
||||
return sequenceId;
|
||||
}
|
||||
|
||||
public async Task<TReply?> WaitForReply<TReply>(uint sequenceId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TReply : class {
|
||||
if (!replyTasks.TryGetValue(sequenceId, out var completionSource)) {
|
||||
logger.Warning("No reply callback for id {SequenceId}.", sequenceId);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
byte[] replyBytes = await completionSource.Task.WaitAsync(waitForReplyTime, cancellationToken);
|
||||
return MessageSerializer.Deserialize<TReply>(replyBytes);
|
||||
} catch (TimeoutException) {
|
||||
return null;
|
||||
} catch (OperationCanceledException) {
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
logger.Warning(e, "Error processing reply with id {SequenceId}.", sequenceId);
|
||||
return null;
|
||||
} finally {
|
||||
ForgetReply(sequenceId);
|
||||
}
|
||||
}
|
||||
|
||||
public void ForgetReply(uint sequenceId) {
|
||||
if (replyTasks.TryRemove(sequenceId, out var task)) {
|
||||
task.SetCanceled();
|
||||
}
|
||||
}
|
||||
|
||||
public void ReceiveReply(uint sequenceId, byte[] serializedReply) {
|
||||
if (replyTasks.TryRemove(sequenceId, out var task)) {
|
||||
task.SetResult(serializedReply);
|
||||
}
|
||||
else {
|
||||
logger.Warning("Received a reply with id {SequenceId} but no registered callback.", sequenceId);
|
||||
}
|
||||
}
|
||||
}
|
@ -5,14 +5,18 @@ using MemoryPack;
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
static class MessageSerializer {
|
||||
private static readonly MemoryPackSerializeOptions SerializerOptions = MemoryPackSerializeOptions.Utf8;
|
||||
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
|
||||
|
||||
public static void Serialize<TMessage, TListener>(IBufferWriter<byte> destination, TMessage message) where TMessage : IMessage<TListener> {
|
||||
MemoryPackSerializer.Serialize(typeof(TMessage), destination, message, SerializerOptions);
|
||||
public static byte[] Serialize<T>(T message) {
|
||||
return MemoryPackSerializer.Serialize(typeof(T), message, SerializerOptions);
|
||||
}
|
||||
|
||||
public static Func<ReadOnlyMemory<byte>, TMessageBase> Deserialize<TMessage, TMessageBase, TListener>() where TMessageBase : IMessage<TListener> where TMessage : TMessageBase {
|
||||
return static memory => MemoryPackSerializer.Deserialize<TMessage>(memory.Span) ?? throw new NullReferenceException();
|
||||
public static void Serialize<T>(IBufferWriter<byte> destination, T message) {
|
||||
MemoryPackSerializer.Serialize(typeof(T), destination, message, SerializerOptions);
|
||||
}
|
||||
|
||||
public static T Deserialize<T>(ReadOnlyMemory<byte> memory) {
|
||||
return MemoryPackSerializer.Deserialize<T>(memory.Span) ?? throw new NullReferenceException();
|
||||
}
|
||||
|
||||
public static void WriteCode(IBufferWriter<byte> destination, ushort value) {
|
||||
|
5
Utils/Phantom.Utils.Rpc/Message/NoReply.cs
Normal file
5
Utils/Phantom.Utils.Rpc/Message/NoReply.cs
Normal file
@ -0,0 +1,5 @@
|
||||
namespace Phantom.Utils.Rpc.Message;
|
||||
|
||||
public readonly struct NoReply {
|
||||
public static NoReply Instance { get; } = new ();
|
||||
}
|
@ -1,4 +1,5 @@
|
||||
using NetMQ;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Runtime;
|
||||
using Serilog;
|
||||
|
||||
@ -25,6 +26,7 @@ static class RpcRuntime {
|
||||
|
||||
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new() {
|
||||
private readonly TSocket socket;
|
||||
private readonly MessageReplyTracker replyTracker;
|
||||
private readonly TaskManager taskManager;
|
||||
private readonly ILogger logger;
|
||||
|
||||
@ -33,6 +35,7 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new(
|
||||
RpcRuntime.SetDefaultSocketOptions(socket.Options);
|
||||
this.socket = socket;
|
||||
this.logger = logger;
|
||||
this.replyTracker = new MessageReplyTracker(logger);
|
||||
this.taskManager = new TaskManager();
|
||||
}
|
||||
|
||||
@ -40,7 +43,7 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new(
|
||||
Connect(socket);
|
||||
|
||||
void RunTask() {
|
||||
Run(socket, taskManager);
|
||||
Run(socket, replyTracker, taskManager);
|
||||
}
|
||||
|
||||
try {
|
||||
@ -50,7 +53,7 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new(
|
||||
} finally {
|
||||
logger.Information("Stopping task manager...");
|
||||
await taskManager.Stop();
|
||||
await Disconnect(socket);
|
||||
await Disconnect();
|
||||
|
||||
socket.Dispose();
|
||||
NetMQConfig.Cleanup();
|
||||
@ -59,9 +62,9 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new(
|
||||
}
|
||||
|
||||
protected abstract void Connect(TSocket socket);
|
||||
protected abstract void Run(TSocket socket, TaskManager taskManager);
|
||||
protected abstract void Run(TSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager);
|
||||
|
||||
protected virtual Task Disconnect(TSocket socket) {
|
||||
protected virtual Task Disconnect() {
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user