mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2024-11-25 16:42:54 +01:00
Compare commits
1 Commits
5bc474d455
...
8ea57f9970
Author | SHA1 | Date | |
---|---|---|---|
8ea57f9970 |
@ -1,6 +1,6 @@
|
|||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Agent.Rpc;
|
namespace Phantom.Agent.Rpc;
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
using Phantom.Common.Messages.Agent.ToController;
|
using Phantom.Common.Messages.Agent.ToController;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Agent.Rpc;
|
namespace Phantom.Agent.Rpc;
|
||||||
@ -26,7 +26,7 @@ sealed class KeepAliveLoop {
|
|||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
await Task.Delay(KeepAliveInterval, cancellationToken);
|
await Task.Delay(KeepAliveInterval, cancellationToken);
|
||||||
await connection.Send(new AgentIsAliveMessage());
|
await connection.Send(new AgentIsAliveMessage()).WaitAsync(cancellationToken);
|
||||||
}
|
}
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
// Ignore.
|
// Ignore.
|
||||||
|
@ -1,33 +1,31 @@
|
|||||||
using NetMQ;
|
using NetMQ;
|
||||||
using NetMQ.Sockets;
|
using NetMQ.Sockets;
|
||||||
using Phantom.Common.Data.Agent;
|
|
||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
using Phantom.Common.Messages.Agent.BiDirectional;
|
using Phantom.Common.Messages.Agent.BiDirectional;
|
||||||
using Phantom.Common.Messages.Agent.ToController;
|
using Phantom.Common.Messages.Agent.ToController;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Phantom.Utils.Rpc.Sockets;
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using Phantom.Utils.Tasks;
|
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Agent.Rpc;
|
namespace Phantom.Agent.Rpc;
|
||||||
|
|
||||||
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
|
public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> {
|
||||||
public static Task Launch(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, AgentInfo agentInfo, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
|
public static Task Launch(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
|
||||||
return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch();
|
return new RpcClientRuntime(socket, messageListener, disconnectSemaphore, receiveCancellationToken).Launch();
|
||||||
}
|
}
|
||||||
|
|
||||||
private RpcClientRuntime(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
|
private RpcClientRuntime(RpcClientSocket<IMessageToAgentListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToAgentListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
|
||||||
|
|
||||||
protected override void RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection, ILogger logger, TaskManager taskManager) {
|
protected override async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<IMessageToControllerListener> connection) {
|
||||||
var keepAliveLoop = new KeepAliveLoop(connection);
|
var keepAliveLoop = new KeepAliveLoop(connection);
|
||||||
try {
|
try {
|
||||||
base.RunWithConnection(socket, connection, logger, taskManager);
|
await base.RunWithConnection(socket, connection);
|
||||||
} finally {
|
} finally {
|
||||||
keepAliveLoop.Cancel();
|
keepAliveLoop.Cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override async Task Disconnect(ClientSocket socket, ILogger logger) {
|
protected override async Task SendDisconnectMessage(ClientSocket socket, ILogger logger) {
|
||||||
var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage()).ToArray();
|
var unregisterMessageBytes = AgentMessageRegistries.ToController.Write(new UnregisterAgentMessage()).ToArray();
|
||||||
try {
|
try {
|
||||||
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
|
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
|
||||||
|
@ -21,7 +21,6 @@ sealed class Instance : IAsyncDisposable {
|
|||||||
private readonly ILogger logger;
|
private readonly ILogger logger;
|
||||||
|
|
||||||
private IInstanceStatus currentStatus;
|
private IInstanceStatus currentStatus;
|
||||||
private int statusUpdateCounter;
|
|
||||||
|
|
||||||
private IInstanceState currentState;
|
private IInstanceState currentState;
|
||||||
public bool IsRunning => currentState is not InstanceNotRunningState;
|
public bool IsRunning => currentState is not InstanceNotRunningState;
|
||||||
@ -38,40 +37,23 @@ sealed class Instance : IAsyncDisposable {
|
|||||||
this.Configuration = configuration;
|
this.Configuration = configuration;
|
||||||
this.Launcher = launcher;
|
this.Launcher = launcher;
|
||||||
|
|
||||||
this.currentState = new InstanceNotRunningState();
|
|
||||||
this.currentStatus = InstanceStatus.NotRunning;
|
this.currentStatus = InstanceStatus.NotRunning;
|
||||||
|
this.currentState = new InstanceNotRunningState();
|
||||||
|
|
||||||
this.procedureManager = new InstanceProcedureManager(this, new Context(this), services.TaskManager);
|
this.procedureManager = new InstanceProcedureManager(this, new Context(this), services.TaskManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void TryUpdateStatus(string taskName, Func<Task> getUpdateTask) {
|
|
||||||
int myStatusUpdateCounter = Interlocked.Increment(ref statusUpdateCounter);
|
|
||||||
|
|
||||||
Services.TaskManager.Run(taskName, async () => {
|
|
||||||
if (myStatusUpdateCounter == statusUpdateCounter) {
|
|
||||||
await getUpdateTask();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public void ReportLastStatus() {
|
public void ReportLastStatus() {
|
||||||
TryUpdateStatus("Report last status of instance " + shortName, async () => {
|
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
|
||||||
await Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ReportAndSetStatus(IInstanceStatus status) {
|
private void ReportAndSetStatus(IInstanceStatus status) {
|
||||||
TryUpdateStatus("Report status of instance " + shortName + " as " + status.GetType().Name, async () => {
|
|
||||||
if (status != currentStatus) {
|
|
||||||
currentStatus = status;
|
currentStatus = status;
|
||||||
await Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
|
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ReportEvent(IInstanceEvent instanceEvent) {
|
private void ReportEvent(IInstanceEvent instanceEvent) {
|
||||||
var message = new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent);
|
Services.ControllerConnection.Send(new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent));
|
||||||
Services.TaskManager.Run("Report event for instance " + shortName, async () => await Services.ControllerConnection.Send(message));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
internal void TransitionState(IInstanceState newState) {
|
internal void TransitionState(IInstanceState newState) {
|
||||||
|
@ -36,14 +36,14 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
|
|||||||
try {
|
try {
|
||||||
while (await lineReader.WaitToReadAsync(CancellationToken)) {
|
while (await lineReader.WaitToReadAsync(CancellationToken)) {
|
||||||
await Task.Delay(SendDelay, CancellationToken);
|
await Task.Delay(SendDelay, CancellationToken);
|
||||||
await SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
|
SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
|
||||||
}
|
}
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
// Ignore.
|
// Ignore.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush remaining lines.
|
// Flush remaining lines.
|
||||||
await SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
|
SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
|
||||||
}
|
}
|
||||||
|
|
||||||
private ImmutableArray<string> ReadLinesFromChannel(ChannelReader<string> reader, ImmutableArray<string>.Builder builder) {
|
private ImmutableArray<string> ReadLinesFromChannel(ChannelReader<string> reader, ImmutableArray<string>.Builder builder) {
|
||||||
@ -61,9 +61,9 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
|
|||||||
return builder.ToImmutable();
|
return builder.ToImmutable();
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task SendOutputToServer(ImmutableArray<string> lines) {
|
private void SendOutputToServer(ImmutableArray<string> lines) {
|
||||||
if (!lines.IsEmpty) {
|
if (!lines.IsEmpty) {
|
||||||
await controllerConnection.Send(new InstanceOutputMessage(instanceGuid, lines));
|
controllerConnection.Send(new InstanceOutputMessage(instanceGuid, lines));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,8 +5,8 @@ using Phantom.Common.Messages.Agent.BiDirectional;
|
|||||||
using Phantom.Common.Messages.Agent.ToAgent;
|
using Phantom.Common.Messages.Agent.ToAgent;
|
||||||
using Phantom.Common.Messages.Agent.ToController;
|
using Phantom.Common.Messages.Agent.ToController;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Rpc;
|
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Agent.Services.Rpc;
|
namespace Phantom.Agent.Services.Rpc;
|
||||||
|
@ -11,7 +11,6 @@ using Phantom.Utils.Logging;
|
|||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc;
|
||||||
using Phantom.Utils.Rpc.Sockets;
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using Phantom.Utils.Runtime;
|
using Phantom.Utils.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
|
||||||
|
|
||||||
const int ProtocolVersion = 1;
|
const int ProtocolVersion = 1;
|
||||||
|
|
||||||
@ -52,7 +51,7 @@ try {
|
|||||||
|
|
||||||
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
|
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
|
||||||
|
|
||||||
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
|
var rpcConfiguration = new RpcConfiguration("Rpc", controllerHost, controllerPort, controllerCertificate);
|
||||||
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
|
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, AgentMessageRegistries.Definitions, new RegisterAgentMessage(agentToken, agentInfo));
|
||||||
|
|
||||||
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcSocket.Connection));
|
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks), new ControllerConnection(rpcSocket.Connection));
|
||||||
@ -60,7 +59,7 @@ try {
|
|||||||
|
|
||||||
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
|
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
|
||||||
var rpcMessageListener = new MessageListener(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource);
|
var rpcMessageListener = new MessageListener(rpcSocket.Connection, agentServices, shutdownCancellationTokenSource);
|
||||||
var rpcTask = RpcClientRuntime.Launch(rpcSocket, agentInfo, rpcMessageListener, rpcDisconnectSemaphore, shutdownCancellationToken);
|
var rpcTask = RpcClientRuntime.Launch(rpcSocket, rpcMessageListener, rpcDisconnectSemaphore, shutdownCancellationToken);
|
||||||
try {
|
try {
|
||||||
await rpcTask.WaitAsync(shutdownCancellationToken);
|
await rpcTask.WaitAsync(shutdownCancellationToken);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -8,6 +8,11 @@ public sealed partial record ReplyMessage(
|
|||||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
[property: MemoryPackOrder(0)] uint SequenceId,
|
||||||
[property: MemoryPackOrder(1)] byte[] SerializedReply
|
[property: MemoryPackOrder(1)] byte[] SerializedReply
|
||||||
) : IMessageToController, IMessageToAgent, IReply {
|
) : IMessageToController, IMessageToAgent, IReply {
|
||||||
|
private static readonly MessageQueueKey MessageQueueKey = new ("Reply");
|
||||||
|
|
||||||
|
[MemoryPackIgnore]
|
||||||
|
public MessageQueueKey QueueKey => MessageQueueKey;
|
||||||
|
|
||||||
public Task<NoReply> Accept(IMessageToControllerListener listener) {
|
public Task<NoReply> Accept(IMessageToControllerListener listener) {
|
||||||
return listener.HandleReply(this);
|
return listener.HandleReply(this);
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,11 @@
|
|||||||
|
|
||||||
namespace Phantom.Common.Messages.Agent;
|
namespace Phantom.Common.Messages.Agent;
|
||||||
|
|
||||||
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {}
|
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {
|
||||||
|
MessageQueueKey IMessage<IMessageToAgentListener, TReply>.QueueKey => IMessageToAgent.DefaultQueueKey;
|
||||||
|
}
|
||||||
|
|
||||||
public interface IMessageToAgent : IMessageToAgent<NoReply> {}
|
public interface IMessageToAgent : IMessageToAgent<NoReply> {
|
||||||
|
internal static readonly MessageQueueKey DefaultQueueKey = new ("Agent.Default");
|
||||||
|
MessageQueueKey IMessage<IMessageToAgentListener, NoReply>.QueueKey => DefaultQueueKey;
|
||||||
|
}
|
||||||
|
@ -2,6 +2,11 @@
|
|||||||
|
|
||||||
namespace Phantom.Common.Messages.Agent;
|
namespace Phantom.Common.Messages.Agent;
|
||||||
|
|
||||||
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {}
|
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {
|
||||||
|
MessageQueueKey IMessage<IMessageToControllerListener, TReply>.QueueKey => IMessageToController.DefaultQueueKey;
|
||||||
|
}
|
||||||
|
|
||||||
public interface IMessageToController : IMessageToController<NoReply> {}
|
public interface IMessageToController : IMessageToController<NoReply> {
|
||||||
|
internal static readonly MessageQueueKey DefaultQueueKey = new ("Agent.Default");
|
||||||
|
MessageQueueKey IMessage<IMessageToControllerListener, NoReply>.QueueKey => DefaultQueueKey;
|
||||||
|
}
|
||||||
|
@ -9,6 +9,11 @@ public sealed partial record InstanceOutputMessage(
|
|||||||
[property: MemoryPackOrder(0)] Guid InstanceGuid,
|
[property: MemoryPackOrder(0)] Guid InstanceGuid,
|
||||||
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines
|
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines
|
||||||
) : IMessageToController {
|
) : IMessageToController {
|
||||||
|
private static readonly MessageQueueKey MessageQueueKey = new ("Agent.InstanceOutput");
|
||||||
|
|
||||||
|
[MemoryPackIgnore]
|
||||||
|
public MessageQueueKey QueueKey => MessageQueueKey;
|
||||||
|
|
||||||
public Task<NoReply> Accept(IMessageToControllerListener listener) {
|
public Task<NoReply> Accept(IMessageToControllerListener listener) {
|
||||||
return listener.HandleInstanceOutput(this);
|
return listener.HandleInstanceOutput(this);
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,11 @@
|
|||||||
|
|
||||||
namespace Phantom.Common.Messages.Web;
|
namespace Phantom.Common.Messages.Web;
|
||||||
|
|
||||||
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {}
|
public interface IMessageToController<TReply> : IMessage<IMessageToControllerListener, TReply> {
|
||||||
|
MessageQueueKey IMessage<IMessageToControllerListener, TReply>.QueueKey => IMessageToController.DefaultQueueKey;
|
||||||
|
}
|
||||||
|
|
||||||
public interface IMessageToController : IMessageToController<NoReply> {}
|
public interface IMessageToController : IMessageToController<NoReply> {
|
||||||
|
internal static readonly MessageQueueKey DefaultQueueKey = new ("Web.Default");
|
||||||
|
MessageQueueKey IMessage<IMessageToControllerListener, NoReply>.QueueKey => DefaultQueueKey;
|
||||||
|
}
|
||||||
|
@ -2,6 +2,11 @@
|
|||||||
|
|
||||||
namespace Phantom.Common.Messages.Web;
|
namespace Phantom.Common.Messages.Web;
|
||||||
|
|
||||||
public interface IMessageToWeb<TReply> : IMessage<IMessageToWebListener, TReply> {}
|
public interface IMessageToWeb<TReply> : IMessage<IMessageToWebListener, TReply> {
|
||||||
|
MessageQueueKey IMessage<IMessageToWebListener, TReply>.QueueKey => IMessageToWeb.DefaultQueueKey;
|
||||||
|
}
|
||||||
|
|
||||||
public interface IMessageToWeb : IMessageToWeb<NoReply> {}
|
public interface IMessageToWeb : IMessageToWeb<NoReply> {
|
||||||
|
internal static readonly MessageQueueKey DefaultQueueKey = new ("Web.Default");
|
||||||
|
MessageQueueKey IMessage<IMessageToWebListener, NoReply>.QueueKey => DefaultQueueKey;
|
||||||
|
}
|
||||||
|
@ -9,6 +9,11 @@ public sealed partial record InstanceOutputMessage(
|
|||||||
[property: MemoryPackOrder(0)] Guid InstanceGuid,
|
[property: MemoryPackOrder(0)] Guid InstanceGuid,
|
||||||
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines
|
[property: MemoryPackOrder(1)] ImmutableArray<string> Lines
|
||||||
) : IMessageToWeb {
|
) : IMessageToWeb {
|
||||||
|
private static readonly MessageQueueKey MessageQueueKey = new ("Web.InstanceOutput");
|
||||||
|
|
||||||
|
[MemoryPackIgnore]
|
||||||
|
public MessageQueueKey QueueKey => MessageQueueKey;
|
||||||
|
|
||||||
public Task<NoReply> Accept(IMessageToWebListener listener) {
|
public Task<NoReply> Accept(IMessageToWebListener listener) {
|
||||||
return listener.HandleInstanceOutput(this);
|
return listener.HandleInstanceOutput(this);
|
||||||
}
|
}
|
||||||
|
@ -1,81 +0,0 @@
|
|||||||
using NetMQ;
|
|
||||||
using NetMQ.Sockets;
|
|
||||||
using Phantom.Utils.Rpc.Message;
|
|
||||||
|
|
||||||
namespace Phantom.Controller.Rpc;
|
|
||||||
|
|
||||||
public sealed class RpcConnectionToClient<TListener> {
|
|
||||||
private readonly ServerSocket socket;
|
|
||||||
private readonly uint routingId;
|
|
||||||
|
|
||||||
private readonly MessageRegistry<TListener> messageRegistry;
|
|
||||||
private readonly MessageReplyTracker messageReplyTracker;
|
|
||||||
|
|
||||||
private volatile bool isAuthorized;
|
|
||||||
|
|
||||||
public bool IsAuthorized {
|
|
||||||
get => isAuthorized;
|
|
||||||
set => isAuthorized = value;
|
|
||||||
}
|
|
||||||
|
|
||||||
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
|
|
||||||
public bool IsClosed { get; private set; }
|
|
||||||
|
|
||||||
internal RpcConnectionToClient(ServerSocket socket, uint routingId, MessageRegistry<TListener> messageRegistry, MessageReplyTracker messageReplyTracker) {
|
|
||||||
this.socket = socket;
|
|
||||||
this.routingId = routingId;
|
|
||||||
this.messageRegistry = messageRegistry;
|
|
||||||
this.messageReplyTracker = messageReplyTracker;
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsSame(RpcConnectionToClient<TListener> other) {
|
|
||||||
return this.routingId == other.routingId && this.socket == other.socket;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Close() {
|
|
||||||
bool hasClosed = false;
|
|
||||||
|
|
||||||
lock (this) {
|
|
||||||
if (!IsClosed) {
|
|
||||||
IsClosed = true;
|
|
||||||
hasClosed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hasClosed) {
|
|
||||||
Closed?.Invoke(this, new RpcClientConnectionClosedEventArgs(routingId));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task Send<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
|
|
||||||
if (IsClosed) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
var bytes = messageRegistry.Write(message).ToArray();
|
|
||||||
if (bytes.Length > 0) {
|
|
||||||
await socket.SendAsync(routingId, bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> where TReply : class {
|
|
||||||
if (IsClosed) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
var sequenceId = messageReplyTracker.RegisterReply();
|
|
||||||
|
|
||||||
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
|
|
||||||
if (bytes.Length == 0) {
|
|
||||||
messageReplyTracker.ForgetReply(sequenceId);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
await socket.SendAsync(routingId, bytes);
|
|
||||||
return await messageReplyTracker.TryWaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Receive(IReply message) {
|
|
||||||
messageReplyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,112 +0,0 @@
|
|||||||
using NetMQ.Sockets;
|
|
||||||
using Phantom.Utils.Rpc;
|
|
||||||
using Phantom.Utils.Rpc.Message;
|
|
||||||
using Phantom.Utils.Rpc.Sockets;
|
|
||||||
using Phantom.Utils.Tasks;
|
|
||||||
using Serilog;
|
|
||||||
using Serilog.Events;
|
|
||||||
|
|
||||||
namespace Phantom.Controller.Rpc;
|
|
||||||
|
|
||||||
public static class RpcRuntime {
|
|
||||||
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
|
||||||
return RpcRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
internal sealed class RpcRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
|
||||||
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
|
|
||||||
var socket = RpcServerSocket.Connect(config);
|
|
||||||
return new RpcRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
|
|
||||||
}
|
|
||||||
|
|
||||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
|
||||||
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
|
|
||||||
private readonly CancellationToken cancellationToken;
|
|
||||||
|
|
||||||
private RpcRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
|
|
||||||
this.messageDefinitions = messageDefinitions;
|
|
||||||
this.listenerFactory = listenerFactory;
|
|
||||||
this.cancellationToken = cancellationToken;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override void Run(ServerSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager) {
|
|
||||||
var clients = new Dictionary<ulong, Client>();
|
|
||||||
|
|
||||||
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
|
||||||
clients.Remove(e.RoutingId);
|
|
||||||
logger.Debug("Closed connection to {RoutingId}.", e.RoutingId);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!cancellationToken.IsCancellationRequested) {
|
|
||||||
var (routingId, data) = socket.Receive(cancellationToken);
|
|
||||||
|
|
||||||
if (data.Length == 0) {
|
|
||||||
LogMessageType(logger, routingId, data, messageType: null);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
|
|
||||||
|
|
||||||
if (!clients.TryGetValue(routingId, out var client)) {
|
|
||||||
if (!CheckIsRegistrationMessage(messageType, logger, routingId)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
var connection = new RpcConnectionToClient<TClientListener>(socket, routingId, messageDefinitions.ToClient, replyTracker);
|
|
||||||
connection.Closed += OnConnectionClosed;
|
|
||||||
|
|
||||||
client = new Client(connection, messageDefinitions, listenerFactory(connection), logger, taskManager, cancellationToken);
|
|
||||||
clients[routingId] = client;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!client.Connection.IsAuthorized && !CheckIsRegistrationMessage(messageType, logger, routingId)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
LogMessageType(logger, routingId, data, messageType);
|
|
||||||
messageDefinitions.ToServer.Handle(data, client);
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach (var client in clients.Values) {
|
|
||||||
client.Connection.Closed -= OnConnectionClosed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void LogMessageType(ILogger logger, uint routingId, ReadOnlyMemory<byte> data, Type? messageType) {
|
|
||||||
if (!logger.IsEnabled(LogEventLevel.Verbose)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data.Length > 0 && messageType != null) {
|
|
||||||
logger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", messageType.Name, data.Length, routingId);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
logger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private bool CheckIsRegistrationMessage(Type? messageType, ILogger logger, uint routingId) {
|
|
||||||
if (messageType != null && messageDefinitions.IsRegistrationMessage(messageType)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Warning("Received {MessageType} from {RoutingId} who is not registered.", messageType?.Name ?? "unknown message", routingId);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private sealed class Client : MessageHandler<TServerListener> {
|
|
||||||
public RpcConnectionToClient<TClientListener> Connection { get; }
|
|
||||||
|
|
||||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
|
||||||
|
|
||||||
public Client(RpcConnectionToClient<TClientListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
|
|
||||||
this.Connection = connection;
|
|
||||||
this.messageDefinitions = messageDefinitions;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
|
||||||
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,5 +1,5 @@
|
|||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
using Phantom.Controller.Rpc;
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Agents;
|
namespace Phantom.Controller.Services.Agents;
|
||||||
|
|
||||||
@ -22,7 +22,7 @@ sealed class AgentConnection {
|
|||||||
return connection.Send(message);
|
return connection.Send(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||||
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
|
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,11 +5,11 @@ using Phantom.Common.Data.Replies;
|
|||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
using Phantom.Common.Messages.Agent.ToAgent;
|
using Phantom.Common.Messages.Agent.ToAgent;
|
||||||
using Phantom.Controller.Database;
|
using Phantom.Controller.Database;
|
||||||
using Phantom.Controller.Rpc;
|
|
||||||
using Phantom.Controller.Services.Instances;
|
using Phantom.Controller.Services.Instances;
|
||||||
using Phantom.Utils.Collections;
|
using Phantom.Utils.Collections;
|
||||||
using Phantom.Utils.Events;
|
using Phantom.Utils.Events;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
@ -126,12 +126,17 @@ sealed class AgentManager {
|
|||||||
|
|
||||||
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, TMessage message, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, TMessage message, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||||
var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null;
|
var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null;
|
||||||
if (connection == null) {
|
if (connection == null || agent == null) {
|
||||||
// TODO handle missing agent?
|
// TODO handle missing agent?
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
return await connection.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
|
return await connection.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Logger.Error(e, "Could not send message to agent \"{Name}\" (GUID {Guid}).", agent.Name, agent.Guid);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> {
|
private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> {
|
||||||
|
@ -3,13 +3,13 @@ using Phantom.Common.Messages.Agent;
|
|||||||
using Phantom.Common.Messages.Web;
|
using Phantom.Common.Messages.Web;
|
||||||
using Phantom.Controller.Database;
|
using Phantom.Controller.Database;
|
||||||
using Phantom.Controller.Minecraft;
|
using Phantom.Controller.Minecraft;
|
||||||
using Phantom.Controller.Rpc;
|
|
||||||
using Phantom.Controller.Services.Agents;
|
using Phantom.Controller.Services.Agents;
|
||||||
using Phantom.Controller.Services.Events;
|
using Phantom.Controller.Services.Events;
|
||||||
using Phantom.Controller.Services.Instances;
|
using Phantom.Controller.Services.Instances;
|
||||||
using Phantom.Controller.Services.Rpc;
|
using Phantom.Controller.Services.Rpc;
|
||||||
using Phantom.Controller.Services.Users;
|
using Phantom.Controller.Services.Users;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services;
|
namespace Phantom.Controller.Services;
|
||||||
|
@ -4,11 +4,11 @@ using Phantom.Common.Messages.Agent;
|
|||||||
using Phantom.Common.Messages.Agent.BiDirectional;
|
using Phantom.Common.Messages.Agent.BiDirectional;
|
||||||
using Phantom.Common.Messages.Agent.ToAgent;
|
using Phantom.Common.Messages.Agent.ToAgent;
|
||||||
using Phantom.Common.Messages.Agent.ToController;
|
using Phantom.Common.Messages.Agent.ToController;
|
||||||
using Phantom.Controller.Rpc;
|
|
||||||
using Phantom.Controller.Services.Agents;
|
using Phantom.Controller.Services.Agents;
|
||||||
using Phantom.Controller.Services.Events;
|
using Phantom.Controller.Services.Events;
|
||||||
using Phantom.Controller.Services.Instances;
|
using Phantom.Controller.Services.Instances;
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Rpc;
|
namespace Phantom.Controller.Services.Rpc;
|
||||||
@ -36,10 +36,11 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
|
|||||||
|
|
||||||
public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) {
|
public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) {
|
||||||
if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.Guid) {
|
if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.Guid) {
|
||||||
|
connection.SetAuthorizationResult(false);
|
||||||
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
|
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
|
||||||
}
|
}
|
||||||
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) {
|
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) {
|
||||||
connection.IsAuthorized = true;
|
connection.SetAuthorizationResult(true);
|
||||||
agentGuidWaiter.SetResult(message.AgentInfo.Guid);
|
agentGuidWaiter.SetResult(message.AgentInfo.Guid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,13 +13,13 @@ using Phantom.Common.Messages.Web.BiDirectional;
|
|||||||
using Phantom.Common.Messages.Web.ToController;
|
using Phantom.Common.Messages.Web.ToController;
|
||||||
using Phantom.Common.Messages.Web.ToWeb;
|
using Phantom.Common.Messages.Web.ToWeb;
|
||||||
using Phantom.Controller.Minecraft;
|
using Phantom.Controller.Minecraft;
|
||||||
using Phantom.Controller.Rpc;
|
|
||||||
using Phantom.Controller.Services.Agents;
|
using Phantom.Controller.Services.Agents;
|
||||||
using Phantom.Controller.Services.Events;
|
using Phantom.Controller.Services.Events;
|
||||||
using Phantom.Controller.Services.Instances;
|
using Phantom.Controller.Services.Instances;
|
||||||
using Phantom.Controller.Services.Users;
|
using Phantom.Controller.Services.Users;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
@ -108,11 +108,12 @@ public sealed class WebMessageListener : IMessageToControllerListener {
|
|||||||
public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) {
|
public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) {
|
||||||
if (authToken.FixedTimeEquals(message.AuthToken)) {
|
if (authToken.FixedTimeEquals(message.AuthToken)) {
|
||||||
Logger.Information("Web authorized successfully.");
|
Logger.Information("Web authorized successfully.");
|
||||||
connection.IsAuthorized = true;
|
connection.SetAuthorizationResult(true);
|
||||||
await connection.Send(new RegisterWebResultMessage(true));
|
await connection.Send(new RegisterWebResultMessage(true));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
Logger.Warning("Web failed to authorize, invalid token.");
|
Logger.Warning("Web failed to authorize, invalid token.");
|
||||||
|
connection.SetAuthorizationResult(false);
|
||||||
await connection.Send(new RegisterWebResultMessage(false));
|
await connection.Send(new RegisterWebResultMessage(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,11 +4,11 @@ using Phantom.Common.Messages.Agent;
|
|||||||
using Phantom.Common.Messages.Web;
|
using Phantom.Common.Messages.Web;
|
||||||
using Phantom.Controller;
|
using Phantom.Controller;
|
||||||
using Phantom.Controller.Database.Postgres;
|
using Phantom.Controller.Database.Postgres;
|
||||||
using Phantom.Controller.Rpc;
|
|
||||||
using Phantom.Controller.Services;
|
using Phantom.Controller.Services;
|
||||||
using Phantom.Utils.IO;
|
using Phantom.Utils.IO;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc;
|
||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Phantom.Utils.Runtime;
|
using Phantom.Utils.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
|
|
||||||
@ -59,15 +59,17 @@ try {
|
|||||||
await controllerServices.Initialize();
|
await controllerServices.Initialize();
|
||||||
|
|
||||||
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
|
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
|
||||||
return new RpcConfiguration(PhantomLogger.Create("Rpc", serviceName), PhantomLogger.Create<TaskManager>("Rpc", serviceName), host, port, connectionKey.Certificate);
|
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
|
||||||
try {
|
try {
|
||||||
await Task.WhenAll(
|
await Task.WhenAll(
|
||||||
RpcRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
|
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
|
||||||
RpcRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
|
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
|
||||||
);
|
);
|
||||||
} finally {
|
} finally {
|
||||||
|
await rpcTaskManager.Stop();
|
||||||
NetMQConfig.Cleanup();
|
NetMQConfig.Cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
namespace Phantom.Utils.Rpc.Message;
|
namespace Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
public interface IMessage<TListener, TReply> {
|
public interface IMessage<TListener, TReply> {
|
||||||
|
MessageQueueKey QueueKey { get; }
|
||||||
Task<TReply> Accept(TListener listener);
|
Task<TReply> Accept(TListener listener);
|
||||||
}
|
}
|
||||||
|
@ -1,38 +1,41 @@
|
|||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Logging;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Message;
|
namespace Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
public abstract class MessageHandler<TListener> {
|
abstract class MessageHandler<TListener> {
|
||||||
private readonly TListener listener;
|
protected ILogger Logger { get; }
|
||||||
private readonly ILogger logger;
|
|
||||||
private readonly TaskManager taskManager;
|
|
||||||
private readonly CancellationToken cancellationToken;
|
|
||||||
|
|
||||||
protected MessageHandler(TListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) {
|
private readonly TListener listener;
|
||||||
|
private readonly MessageQueues messageQueues;
|
||||||
|
|
||||||
|
protected MessageHandler(string loggerName, TListener listener) {
|
||||||
|
this.Logger = PhantomLogger.Create("MessageHandler", loggerName);
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
this.logger = logger;
|
this.messageQueues = new MessageQueues(loggerName + ":Receive");
|
||||||
this.taskManager = taskManager;
|
|
||||||
this.cancellationToken = cancellationToken;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
||||||
cancellationToken.ThrowIfCancellationRequested();
|
messageQueues.Enqueue(message.QueueKey, () => TryHandle<TMessage, TReply>(sequenceId, message));
|
||||||
taskManager.Run("Handle message " + message.GetType().Name, async () => {
|
}
|
||||||
try {
|
|
||||||
await Handle<TMessage, TReply>(sequenceId, message);
|
private async Task TryHandle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
||||||
} catch (Exception e) {
|
TReply reply;
|
||||||
logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
|
try {
|
||||||
}
|
reply = await message.Accept(listener);
|
||||||
});
|
} catch (Exception e) {
|
||||||
|
Logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task Handle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
|
||||||
TReply reply = await message.Accept(listener);
|
|
||||||
if (reply is not NoReply) {
|
if (reply is not NoReply) {
|
||||||
await SendReply(sequenceId, MessageSerializer.Serialize(reply));
|
await SendReply(sequenceId, MessageSerializer.Serialize(reply));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract Task SendReply(uint sequenceId, byte[] serializedReply);
|
protected abstract Task SendReply(uint sequenceId, byte[] serializedReply);
|
||||||
|
|
||||||
|
internal Task StopReceiving() {
|
||||||
|
return messageQueues.Stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
9
Utils/Phantom.Utils.Rpc/Message/MessageQueueKey.cs
Normal file
9
Utils/Phantom.Utils.Rpc/Message/MessageQueueKey.cs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
namespace Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
public sealed class MessageQueueKey {
|
||||||
|
public string Name { get; }
|
||||||
|
|
||||||
|
public MessageQueueKey(string name) {
|
||||||
|
Name = name;
|
||||||
|
}
|
||||||
|
}
|
53
Utils/Phantom.Utils.Rpc/Message/MessageQueues.cs
Normal file
53
Utils/Phantom.Utils.Rpc/Message/MessageQueues.cs
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Tasks;
|
||||||
|
using Serilog;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
sealed class MessageQueues {
|
||||||
|
private readonly ILogger logger;
|
||||||
|
private readonly TaskManager taskManager;
|
||||||
|
private readonly Dictionary<MessageQueueKey, RpcQueue> queues = new ();
|
||||||
|
|
||||||
|
private Task? stopTask;
|
||||||
|
|
||||||
|
public MessageQueues(string loggerName) {
|
||||||
|
this.logger = PhantomLogger.Create<MessageQueues>(loggerName);
|
||||||
|
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(loggerName));
|
||||||
|
}
|
||||||
|
|
||||||
|
private RpcQueue GetOrCreateQueue(MessageQueueKey key) {
|
||||||
|
if (!queues.TryGetValue(key, out var queue)) {
|
||||||
|
queues[key] = queue = new RpcQueue(taskManager, "Message queue for " + key.Name);
|
||||||
|
}
|
||||||
|
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Enqueue(MessageQueueKey key, Func<Task> task) {
|
||||||
|
lock (this) {
|
||||||
|
return stopTask == null ? GetOrCreateQueue(key).Enqueue(task) : Task.FromException(new OperationCanceledException());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<T> Enqueue<T>(MessageQueueKey key, Func<Task<T>> task) {
|
||||||
|
lock (this) {
|
||||||
|
return stopTask == null ? GetOrCreateQueue(key).Enqueue(task) : Task.FromException<T>(new OperationCanceledException());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal Task Stop() {
|
||||||
|
lock (this) {
|
||||||
|
if (stopTask == null) {
|
||||||
|
logger.Debug("Stopping " + queues.Count + " message queue(s)...");
|
||||||
|
|
||||||
|
stopTask = Task.WhenAll(queues.Values.Select(static queue => queue.Stop()))
|
||||||
|
.ContinueWith(_ => logger.Debug("All queues stopped."));
|
||||||
|
|
||||||
|
queues.Clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
return stopTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -36,7 +36,7 @@ public sealed class MessageRegistry<TListener> {
|
|||||||
codeToTypeMapping.Add(code, typeof(TMessage));
|
codeToTypeMapping.Add(code, typeof(TMessage));
|
||||||
}
|
}
|
||||||
|
|
||||||
public bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
|
internal bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
|
||||||
try {
|
try {
|
||||||
var code = MessageSerializer.ReadCode(ref data);
|
var code = MessageSerializer.ReadCode(ref data);
|
||||||
return codeToTypeMapping.TryGetValue(code, out type);
|
return codeToTypeMapping.TryGetValue(code, out type);
|
||||||
@ -78,7 +78,7 @@ public sealed class MessageRegistry<TListener> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Handle(ReadOnlyMemory<byte> data, MessageHandler<TListener> handler) {
|
internal void Handle(ReadOnlyMemory<byte> data, MessageHandler<TListener> handler) {
|
||||||
ushort code;
|
ushort code;
|
||||||
try {
|
try {
|
||||||
code = MessageSerializer.ReadCode(ref data);
|
code = MessageSerializer.ReadCode(ref data);
|
||||||
|
@ -1,17 +1,18 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Message;
|
namespace Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
public sealed class MessageReplyTracker {
|
sealed class MessageReplyTracker {
|
||||||
private readonly ILogger logger;
|
private readonly ILogger logger;
|
||||||
private readonly ConcurrentDictionary<uint, TaskCompletionSource<byte[]>> replyTasks = new (4, 16);
|
private readonly ConcurrentDictionary<uint, TaskCompletionSource<byte[]>> replyTasks = new (4, 16);
|
||||||
|
|
||||||
private uint lastSequenceId;
|
private uint lastSequenceId;
|
||||||
|
|
||||||
internal MessageReplyTracker(ILogger logger) {
|
internal MessageReplyTracker(string loggerName) {
|
||||||
this.logger = logger;
|
this.logger = PhantomLogger.Create<MessageReplyTracker>(loggerName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public uint RegisterReply() {
|
public uint RegisterReply() {
|
||||||
@ -43,14 +44,6 @@ public sealed class MessageReplyTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<TReply?> TryWaitForReply<TReply>(uint sequenceId, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TReply : class {
|
|
||||||
try {
|
|
||||||
return await WaitForReply<TReply>(sequenceId, waitForReplyTime, cancellationToken);
|
|
||||||
} catch (Exception) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void ForgetReply(uint sequenceId) {
|
public void ForgetReply(uint sequenceId) {
|
||||||
if (replyTasks.TryRemove(sequenceId, out var task)) {
|
if (replyTasks.TryRemove(sequenceId, out var task)) {
|
||||||
task.SetCanceled();
|
task.SetCanceled();
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
|
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
|
||||||
|
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -1,8 +1,7 @@
|
|||||||
using NetMQ;
|
using NetMQ;
|
||||||
using Serilog;
|
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc;
|
namespace Phantom.Utils.Rpc;
|
||||||
|
|
||||||
public sealed record RpcConfiguration(ILogger RuntimeLogger, ILogger TaskManagerLogger, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
||||||
public string TcpUrl => "tcp://" + Host + ":" + Port;
|
public string TcpUrl => "tcp://" + Host + ":" + Port;
|
||||||
}
|
}
|
||||||
|
@ -1,54 +0,0 @@
|
|||||||
using NetMQ;
|
|
||||||
using NetMQ.Sockets;
|
|
||||||
using Phantom.Utils.Rpc.Message;
|
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc;
|
|
||||||
|
|
||||||
public sealed class RpcConnectionToServer<TListener> {
|
|
||||||
private readonly ClientSocket socket;
|
|
||||||
private readonly MessageRegistry<TListener> messageRegistry;
|
|
||||||
private readonly MessageReplyTracker replyTracker;
|
|
||||||
|
|
||||||
internal RpcConnectionToServer(ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) {
|
|
||||||
this.socket = socket;
|
|
||||||
this.messageRegistry = messageRegistry;
|
|
||||||
this.replyTracker = replyTracker;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task Send<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
|
|
||||||
var bytes = messageRegistry.Write(message).ToArray();
|
|
||||||
if (bytes.Length > 0) {
|
|
||||||
await socket.SendAsync(bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<TReply?> TrySend<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> where TReply : class {
|
|
||||||
var sequenceId = replyTracker.RegisterReply();
|
|
||||||
|
|
||||||
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
|
|
||||||
if (bytes.Length == 0) {
|
|
||||||
replyTracker.ForgetReply(sequenceId);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
await socket.SendAsync(bytes);
|
|
||||||
return await replyTracker.TryWaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
|
|
||||||
var sequenceId = replyTracker.RegisterReply();
|
|
||||||
|
|
||||||
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
|
|
||||||
if (bytes.Length == 0) {
|
|
||||||
replyTracker.ForgetReply(sequenceId);
|
|
||||||
throw new ArgumentException("Could not write message.", nameof(message));
|
|
||||||
}
|
|
||||||
|
|
||||||
await socket.SendAsync(bytes);
|
|
||||||
return await replyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Receive(IReply message) {
|
|
||||||
replyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
|
|
||||||
}
|
|
||||||
}
|
|
@ -3,7 +3,7 @@ using NetMQ.Sockets;
|
|||||||
|
|
||||||
namespace Phantom.Utils.Rpc;
|
namespace Phantom.Utils.Rpc;
|
||||||
|
|
||||||
public static class RpcExtensions {
|
static class RpcExtensions {
|
||||||
public static ReadOnlyMemory<byte> Receive(this ClientSocket socket, CancellationToken cancellationToken) {
|
public static ReadOnlyMemory<byte> Receive(this ClientSocket socket, CancellationToken cancellationToken) {
|
||||||
var msg = new Msg();
|
var msg = new Msg();
|
||||||
msg.InitEmpty();
|
msg.InitEmpty();
|
||||||
|
60
Utils/Phantom.Utils.Rpc/RpcQueue.cs
Normal file
60
Utils/Phantom.Utils.Rpc/RpcQueue.cs
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
using System.Threading.Channels;
|
||||||
|
using Phantom.Utils.Tasks;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc;
|
||||||
|
|
||||||
|
sealed class RpcQueue {
|
||||||
|
private readonly Channel<Func<Task>> channel = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions {
|
||||||
|
SingleReader = true,
|
||||||
|
SingleWriter = false,
|
||||||
|
AllowSynchronousContinuations = false
|
||||||
|
});
|
||||||
|
|
||||||
|
private readonly Task processingTask;
|
||||||
|
|
||||||
|
public RpcQueue(TaskManager taskManager, string taskName) {
|
||||||
|
this.processingTask = taskManager.Run(taskName, Process);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Enqueue(Action action) {
|
||||||
|
return Enqueue(() => {
|
||||||
|
action();
|
||||||
|
return Task.CompletedTask;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Enqueue(Func<Task> task) {
|
||||||
|
var completionSource = AsyncTasks.CreateCompletionSource();
|
||||||
|
|
||||||
|
if (!channel.Writer.TryWrite(() => task().ContinueWith(t => completionSource.SetResultFrom(t)))) {
|
||||||
|
completionSource.SetCanceled();
|
||||||
|
}
|
||||||
|
|
||||||
|
return completionSource.Task;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<T> Enqueue<T>(Func<Task<T>> task) {
|
||||||
|
var completionSource = AsyncTasks.CreateCompletionSource<T>();
|
||||||
|
|
||||||
|
if (!channel.Writer.TryWrite(() => task().ContinueWith(t => completionSource.SetResultFrom(t)))) {
|
||||||
|
completionSource.SetCanceled();
|
||||||
|
}
|
||||||
|
|
||||||
|
return completionSource.Task;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task Process() {
|
||||||
|
try {
|
||||||
|
await foreach (var task in channel.Reader.ReadAllAsync()) {
|
||||||
|
await task();
|
||||||
|
}
|
||||||
|
} catch (OperationCanceledException) {
|
||||||
|
// Ignore.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Stop() {
|
||||||
|
channel.Writer.Complete();
|
||||||
|
return processingTask;
|
||||||
|
}
|
||||||
|
}
|
@ -1,49 +0,0 @@
|
|||||||
using NetMQ;
|
|
||||||
using Phantom.Utils.Rpc.Message;
|
|
||||||
using Phantom.Utils.Rpc.Sockets;
|
|
||||||
using Phantom.Utils.Tasks;
|
|
||||||
using Serilog;
|
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc;
|
|
||||||
|
|
||||||
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
|
|
||||||
private readonly TSocket socket;
|
|
||||||
private readonly ILogger runtimeLogger;
|
|
||||||
private readonly MessageReplyTracker replyTracker;
|
|
||||||
private readonly TaskManager taskManager;
|
|
||||||
|
|
||||||
protected RpcRuntime(RpcSocket<TSocket> socket) {
|
|
||||||
this.socket = socket.Socket;
|
|
||||||
this.runtimeLogger = socket.Config.RuntimeLogger;
|
|
||||||
this.replyTracker = socket.ReplyTracker;
|
|
||||||
this.taskManager = new TaskManager(socket.Config.TaskManagerLogger);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected async Task Launch() {
|
|
||||||
void RunTask() {
|
|
||||||
try {
|
|
||||||
Run(socket, runtimeLogger, replyTracker, taskManager);
|
|
||||||
} catch (Exception e) {
|
|
||||||
runtimeLogger.Error(e, "Caught exception in RPC thread.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
|
|
||||||
} catch (OperationCanceledException) {
|
|
||||||
// Ignore.
|
|
||||||
} finally {
|
|
||||||
await taskManager.Stop();
|
|
||||||
await Disconnect(socket, runtimeLogger);
|
|
||||||
|
|
||||||
socket.Dispose();
|
|
||||||
runtimeLogger.Information("ZeroMQ runtime stopped.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract void Run(TSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager);
|
|
||||||
|
|
||||||
protected virtual Task Disconnect(TSocket socket, ILogger logger) {
|
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,6 +1,6 @@
|
|||||||
namespace Phantom.Controller.Rpc;
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
public sealed class RpcClientConnectionClosedEventArgs : EventArgs {
|
sealed class RpcClientConnectionClosedEventArgs : EventArgs {
|
||||||
internal uint RoutingId { get; }
|
internal uint RoutingId { get; }
|
||||||
|
|
||||||
internal RpcClientConnectionClosedEventArgs(uint routingId) {
|
internal RpcClientConnectionClosedEventArgs(uint routingId) {
|
@ -1,11 +1,10 @@
|
|||||||
using NetMQ.Sockets;
|
using NetMQ.Sockets;
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
using Phantom.Utils.Rpc.Sockets;
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using Phantom.Utils.Tasks;
|
|
||||||
using Serilog;
|
using Serilog;
|
||||||
using Serilog.Events;
|
using Serilog.Events;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc;
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ClientSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
private readonly RpcConnectionToServer<TServerListener> connection;
|
private readonly RpcConnectionToServer<TServerListener> connection;
|
||||||
@ -23,18 +22,18 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
|
|||||||
this.receiveCancellationToken = receiveCancellationToken;
|
this.receiveCancellationToken = receiveCancellationToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected sealed override void Run(ClientSocket socket, ILogger logger, MessageReplyTracker replyTracker, TaskManager taskManager) {
|
private protected sealed override Task Run(ClientSocket socket) {
|
||||||
RunWithConnection(socket, connection, logger, taskManager);
|
return RunWithConnection(socket, connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected virtual void RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection, ILogger logger, TaskManager taskManager) {
|
protected virtual async Task RunWithConnection(ClientSocket socket, RpcConnectionToServer<TServerListener> connection) {
|
||||||
var handler = new Handler(connection, messageDefinitions, messageListener, logger, taskManager, receiveCancellationToken);
|
var handler = new Handler(LoggerName, connection, messageDefinitions, messageListener);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (!receiveCancellationToken.IsCancellationRequested) {
|
while (!receiveCancellationToken.IsCancellationRequested) {
|
||||||
var data = socket.Receive(receiveCancellationToken);
|
var data = socket.Receive(receiveCancellationToken);
|
||||||
|
|
||||||
LogMessageType(logger, data);
|
LogMessageType(RuntimeLogger, data);
|
||||||
|
|
||||||
if (data.Length > 0) {
|
if (data.Length > 0) {
|
||||||
messageDefinitions.ToClient.Handle(data, handler);
|
messageDefinitions.ToClient.Handle(data, handler);
|
||||||
@ -43,11 +42,25 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
|
|||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
// Ignore.
|
// Ignore.
|
||||||
} finally {
|
} finally {
|
||||||
logger.Debug("ZeroMQ client stopped receiving messages.");
|
await handler.StopReceiving();
|
||||||
disconnectSemaphore.Wait(CancellationToken.None);
|
RuntimeLogger.Debug("ZeroMQ client stopped receiving messages.");
|
||||||
|
|
||||||
|
await disconnectSemaphore.WaitAsync(CancellationToken.None);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private protected sealed override async Task Disconnect(ClientSocket socket) {
|
||||||
|
try {
|
||||||
|
await connection.StopSending().WaitAsync(TimeSpan.FromSeconds(10), CancellationToken.None);
|
||||||
|
} catch (TimeoutException) {
|
||||||
|
RuntimeLogger.Error("Timed out waiting for message sending queue.");
|
||||||
|
}
|
||||||
|
|
||||||
|
await SendDisconnectMessage(socket, RuntimeLogger);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract Task SendDisconnectMessage(ClientSocket socket, ILogger logger);
|
||||||
|
|
||||||
private void LogMessageType(ILogger logger, ReadOnlyMemory<byte> data) {
|
private void LogMessageType(ILogger logger, ReadOnlyMemory<byte> data) {
|
||||||
if (!logger.IsEnabled(LogEventLevel.Verbose)) {
|
if (!logger.IsEnabled(LogEventLevel.Verbose)) {
|
||||||
return;
|
return;
|
||||||
@ -65,7 +78,7 @@ public abstract class RpcClientRuntime<TClientListener, TServerListener, TReplyM
|
|||||||
private readonly RpcConnectionToServer<TServerListener> connection;
|
private readonly RpcConnectionToServer<TServerListener> connection;
|
||||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||||
|
|
||||||
public Handler(RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TClientListener listener, ILogger logger, TaskManager taskManager, CancellationToken cancellationToken) : base(listener, logger, taskManager, cancellationToken) {
|
public Handler(string loggerName, RpcConnectionToServer<TServerListener> connection, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TClientListener listener) : base(loggerName, listener) {
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.messageDefinitions = messageDefinitions;
|
this.messageDefinitions = messageDefinitions;
|
||||||
}
|
}
|
53
Utils/Phantom.Utils.Rpc/Runtime/RpcConnection.cs
Normal file
53
Utils/Phantom.Utils.Rpc/Runtime/RpcConnection.cs
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
public abstract class RpcConnection<TListener> {
|
||||||
|
private readonly MessageRegistry<TListener> messageRegistry;
|
||||||
|
private readonly MessageQueues sendingQueues;
|
||||||
|
private readonly MessageReplyTracker replyTracker;
|
||||||
|
|
||||||
|
internal RpcConnection(string loggerName, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) {
|
||||||
|
this.messageRegistry = messageRegistry;
|
||||||
|
this.sendingQueues = new MessageQueues(loggerName + ":Send");
|
||||||
|
this.replyTracker = replyTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
private protected abstract ValueTask Send(byte[] bytes);
|
||||||
|
|
||||||
|
public Task Send<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
|
||||||
|
return sendingQueues.Enqueue(message.QueueKey, () => SendTask(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
|
||||||
|
return sendingQueues.Enqueue(message.QueueKey, () => SendTask<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken));
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task SendTask<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
|
||||||
|
var bytes = messageRegistry.Write(message).ToArray();
|
||||||
|
if (bytes.Length > 0) {
|
||||||
|
await Send(bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<TReply> SendTask<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessage<TListener, TReply> {
|
||||||
|
var sequenceId = replyTracker.RegisterReply();
|
||||||
|
|
||||||
|
var bytes = messageRegistry.Write<TMessage, TReply>(sequenceId, message).ToArray();
|
||||||
|
if (bytes.Length == 0) {
|
||||||
|
replyTracker.ForgetReply(sequenceId);
|
||||||
|
throw new ArgumentException("Could not write message.", nameof(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
await Send(bytes);
|
||||||
|
return await replyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, waitForReplyCancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Receive(IReply message) {
|
||||||
|
replyTracker.ReceiveReply(message.SequenceId, message.SerializedReply);
|
||||||
|
}
|
||||||
|
|
||||||
|
internal Task StopSending() {
|
||||||
|
return sendingQueues.Stop();
|
||||||
|
}
|
||||||
|
}
|
51
Utils/Phantom.Utils.Rpc/Runtime/RpcConnectionToClient.cs
Normal file
51
Utils/Phantom.Utils.Rpc/Runtime/RpcConnectionToClient.cs
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
using NetMQ;
|
||||||
|
using NetMQ.Sockets;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
public sealed class RpcConnectionToClient<TListener> : RpcConnection<TListener> {
|
||||||
|
private readonly ServerSocket socket;
|
||||||
|
private readonly uint routingId;
|
||||||
|
|
||||||
|
private readonly TaskCompletionSource<bool> authorizationCompletionSource = new ();
|
||||||
|
|
||||||
|
internal event EventHandler<RpcClientConnectionClosedEventArgs>? Closed;
|
||||||
|
public bool IsClosed { get; private set; }
|
||||||
|
|
||||||
|
internal RpcConnectionToClient(string loggerName, ServerSocket socket, uint routingId, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
|
||||||
|
this.socket = socket;
|
||||||
|
this.routingId = routingId;
|
||||||
|
}
|
||||||
|
|
||||||
|
internal Task<bool> GetAuthorization() {
|
||||||
|
return authorizationCompletionSource.Task;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SetAuthorizationResult(bool isAuthorized) {
|
||||||
|
authorizationCompletionSource.SetResult(isAuthorized);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool IsSame(RpcConnectionToClient<TListener> other) {
|
||||||
|
return this.routingId == other.routingId && this.socket == other.socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Close() {
|
||||||
|
bool hasClosed = false;
|
||||||
|
|
||||||
|
lock (this) {
|
||||||
|
if (!IsClosed) {
|
||||||
|
IsClosed = true;
|
||||||
|
hasClosed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasClosed) {
|
||||||
|
Closed?.Invoke(this, new RpcClientConnectionClosedEventArgs(routingId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private protected override ValueTask Send(byte[] bytes) {
|
||||||
|
return socket.SendAsync(routingId, bytes);
|
||||||
|
}
|
||||||
|
}
|
17
Utils/Phantom.Utils.Rpc/Runtime/RpcConnectionToServer.cs
Normal file
17
Utils/Phantom.Utils.Rpc/Runtime/RpcConnectionToServer.cs
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
using NetMQ;
|
||||||
|
using NetMQ.Sockets;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
public sealed class RpcConnectionToServer<TListener> : RpcConnection<TListener> {
|
||||||
|
private readonly ClientSocket socket;
|
||||||
|
|
||||||
|
internal RpcConnectionToServer(string loggerName, ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
|
||||||
|
this.socket = socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
private protected override ValueTask Send(byte[] bytes) {
|
||||||
|
return socket.SendAsync(bytes);
|
||||||
|
}
|
||||||
|
}
|
50
Utils/Phantom.Utils.Rpc/Runtime/RpcRuntime.cs
Normal file
50
Utils/Phantom.Utils.Rpc/Runtime/RpcRuntime.cs
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
using System.Diagnostics.CodeAnalysis;
|
||||||
|
using NetMQ;
|
||||||
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
|
using Serilog;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket {
|
||||||
|
private readonly TSocket socket;
|
||||||
|
|
||||||
|
private protected string LoggerName { get; }
|
||||||
|
private protected ILogger RuntimeLogger { get; }
|
||||||
|
private protected MessageReplyTracker ReplyTracker { get; }
|
||||||
|
|
||||||
|
protected RpcRuntime(RpcSocket<TSocket> socket) {
|
||||||
|
this.socket = socket.Socket;
|
||||||
|
|
||||||
|
this.LoggerName = socket.Config.LoggerName;
|
||||||
|
this.RuntimeLogger = PhantomLogger.Create(LoggerName);
|
||||||
|
this.ReplyTracker = socket.ReplyTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected async Task Launch() {
|
||||||
|
[SuppressMessage("ReSharper", "AccessToDisposedClosure")]
|
||||||
|
async Task RunTask() {
|
||||||
|
try {
|
||||||
|
await Run(socket);
|
||||||
|
} catch (Exception e) {
|
||||||
|
RuntimeLogger.Error(e, "Caught exception in RPC thread.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await Task.Factory.StartNew(RunTask, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
|
||||||
|
} catch (OperationCanceledException) {
|
||||||
|
// Ignore.
|
||||||
|
} finally {
|
||||||
|
await Disconnect(socket);
|
||||||
|
|
||||||
|
socket.Dispose();
|
||||||
|
RuntimeLogger.Information("ZeroMQ runtime stopped.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private protected abstract Task Run(TSocket socket);
|
||||||
|
|
||||||
|
private protected abstract Task Disconnect(TSocket socket);
|
||||||
|
}
|
145
Utils/Phantom.Utils.Rpc/Runtime/RpcServerRuntime.cs
Normal file
145
Utils/Phantom.Utils.Rpc/Runtime/RpcServerRuntime.cs
Normal file
@ -0,0 +1,145 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using NetMQ.Sockets;
|
||||||
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
|
using Phantom.Utils.Tasks;
|
||||||
|
using Serilog.Events;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
public static class RpcServerRuntime {
|
||||||
|
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
|
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
|
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
|
||||||
|
var socket = RpcServerSocket.Connect(config);
|
||||||
|
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
|
||||||
|
}
|
||||||
|
|
||||||
|
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||||
|
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
|
||||||
|
private readonly TaskManager taskManager;
|
||||||
|
private readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
|
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
|
||||||
|
this.messageDefinitions = messageDefinitions;
|
||||||
|
this.listenerFactory = listenerFactory;
|
||||||
|
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
|
||||||
|
this.cancellationToken = cancellationToken;
|
||||||
|
}
|
||||||
|
|
||||||
|
private protected override Task Run(ServerSocket socket) {
|
||||||
|
var clients = new ConcurrentDictionary<ulong, Client>();
|
||||||
|
|
||||||
|
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
||||||
|
if (!clients.Remove(e.RoutingId, out var client)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
RuntimeLogger.Debug("Closing connection to {RoutingId}.", e.RoutingId);
|
||||||
|
client.Connection.Closed -= OnConnectionClosed;
|
||||||
|
|
||||||
|
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
|
||||||
|
await client.StopReceiving();
|
||||||
|
await client.StopProcessing();
|
||||||
|
await client.Connection.StopSending();
|
||||||
|
RuntimeLogger.Debug("Closed connection to {RoutingId}.", e.RoutingId);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!cancellationToken.IsCancellationRequested) {
|
||||||
|
var (routingId, data) = socket.Receive(cancellationToken);
|
||||||
|
|
||||||
|
if (data.Length == 0) {
|
||||||
|
LogMessageType(routingId, data, messageType: null);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
|
||||||
|
|
||||||
|
if (!clients.TryGetValue(routingId, out var client)) {
|
||||||
|
var clientLoggerName = LoggerName + ":" + routingId;
|
||||||
|
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
|
||||||
|
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
|
||||||
|
|
||||||
|
connection.Closed += OnConnectionClosed;
|
||||||
|
|
||||||
|
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection));
|
||||||
|
clients[routingId] = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
LogMessageType(routingId, data, messageType);
|
||||||
|
client.Enqueue(messageType, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var client in clients.Values) {
|
||||||
|
client.Connection.Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
private protected override Task Disconnect(ServerSocket socket) {
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void LogMessageType(uint routingId, ReadOnlyMemory<byte> data, Type? messageType) {
|
||||||
|
if (!RuntimeLogger.IsEnabled(LogEventLevel.Verbose)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data.Length > 0 && messageType != null) {
|
||||||
|
RuntimeLogger.Verbose("Received {MessageType} ({Bytes} B) from {RoutingId}.", messageType.Name, data.Length, routingId);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
RuntimeLogger.Verbose("Received {Bytes} B message from {RoutingId}.", data.Length, routingId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class Client : MessageHandler<TServerListener> {
|
||||||
|
public RpcConnectionToClient<TClientListener> Connection { get; }
|
||||||
|
|
||||||
|
private readonly RpcQueue processingQueue;
|
||||||
|
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||||
|
|
||||||
|
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener) : base(loggerName, listener) {
|
||||||
|
this.Connection = connection;
|
||||||
|
this.processingQueue = processingQueue;
|
||||||
|
this.messageDefinitions = messageDefinitions;
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void Enqueue(Type? messageType, ReadOnlyMemory<byte> data) {
|
||||||
|
if (!Connection.GetAuthorization().IsCompleted && messageType != null && messageDefinitions.IsRegistrationMessage(messageType)) {
|
||||||
|
processingQueue.Enqueue(() => Handle(data));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void Handle(ReadOnlyMemory<byte> data) {
|
||||||
|
messageDefinitions.ToServer.Handle(data, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
|
||||||
|
if (await Connection.GetAuthorization()) {
|
||||||
|
Handle(data);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Logger.Warning("Dropped message after failed registration.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
||||||
|
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
|
||||||
|
}
|
||||||
|
|
||||||
|
internal Task StopProcessing() {
|
||||||
|
return processingQueue.Stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,8 @@
|
|||||||
using NetMQ;
|
using NetMQ;
|
||||||
using NetMQ.Sockets;
|
using NetMQ.Sockets;
|
||||||
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Sockets;
|
namespace Phantom.Utils.Rpc.Sockets;
|
||||||
|
|
||||||
@ -21,7 +23,7 @@ public sealed class RpcClientSocket<TClientListener, TServerListener, TReplyMess
|
|||||||
RpcSocket.SetDefaultSocketOptions(options);
|
RpcSocket.SetDefaultSocketOptions(options);
|
||||||
|
|
||||||
var url = config.TcpUrl;
|
var url = config.TcpUrl;
|
||||||
var logger = config.RuntimeLogger;
|
var logger = PhantomLogger.Create(config.LoggerName);
|
||||||
|
|
||||||
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
|
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
|
||||||
socket.Connect(url);
|
socket.Connect(url);
|
||||||
@ -35,6 +37,6 @@ public sealed class RpcClientSocket<TClientListener, TServerListener, TReplyMess
|
|||||||
|
|
||||||
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions) : base(socket, config) {
|
private RpcClientSocket(ClientSocket socket, RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions) : base(socket, config) {
|
||||||
MessageDefinitions = messageDefinitions;
|
MessageDefinitions = messageDefinitions;
|
||||||
Connection = new RpcConnectionToServer<TServerListener>(socket, messageDefinitions.ToServer, ReplyTracker);
|
Connection = new RpcConnectionToServer<TServerListener>(config.LoggerName, socket, messageDefinitions.ToServer, ReplyTracker);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using NetMQ.Sockets;
|
using NetMQ.Sockets;
|
||||||
|
using Phantom.Utils.Logging;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Sockets;
|
namespace Phantom.Utils.Rpc.Sockets;
|
||||||
|
|
||||||
@ -12,7 +13,7 @@ public sealed class RpcServerSocket : RpcSocket<ServerSocket> {
|
|||||||
RpcSocket.SetDefaultSocketOptions(options);
|
RpcSocket.SetDefaultSocketOptions(options);
|
||||||
|
|
||||||
var url = config.TcpUrl;
|
var url = config.TcpUrl;
|
||||||
var logger = config.RuntimeLogger;
|
var logger = PhantomLogger.Create(config.LoggerName);
|
||||||
|
|
||||||
logger.Information("Starting ZeroMQ server on {Url}...", url);
|
logger.Information("Starting ZeroMQ server on {Url}...", url);
|
||||||
socket.Bind(url);
|
socket.Bind(url);
|
||||||
|
@ -20,6 +20,6 @@ public abstract class RpcSocket<TSocket> where TSocket : ThreadSafeSocket {
|
|||||||
protected RpcSocket(TSocket socket, RpcConfiguration config) {
|
protected RpcSocket(TSocket socket, RpcConfiguration config) {
|
||||||
Socket = socket;
|
Socket = socket;
|
||||||
Config = config;
|
Config = config;
|
||||||
ReplyTracker = new MessageReplyTracker(config.RuntimeLogger);
|
ReplyTracker = new MessageReplyTracker(config.LoggerName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,4 +8,28 @@ public static class AsyncTasks {
|
|||||||
public static TaskCompletionSource<T> CreateCompletionSource<T>() {
|
public static TaskCompletionSource<T> CreateCompletionSource<T>() {
|
||||||
return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
|
return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void SetResultFrom(this TaskCompletionSource completionSource, Task task) {
|
||||||
|
if (task.IsFaulted) {
|
||||||
|
completionSource.SetException(task.Exception.InnerExceptions);
|
||||||
|
}
|
||||||
|
else if (task.IsCanceled) {
|
||||||
|
completionSource.SetCanceled();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
completionSource.SetResult();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void SetResultFrom<T>(this TaskCompletionSource<T> completionSource, Task<T> task) {
|
||||||
|
if (task.IsFaulted) {
|
||||||
|
completionSource.SetException(task.Exception.InnerExceptions);
|
||||||
|
}
|
||||||
|
else if (task.IsCanceled) {
|
||||||
|
completionSource.SetCanceled();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
completionSource.SetResult(task.Result);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
using Phantom.Common.Messages.Web;
|
using Phantom.Common.Messages.Web;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
namespace Phantom.Web.Services.Rpc;
|
namespace Phantom.Web.Services.Rpc;
|
||||||
|
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
using Phantom.Common.Messages.Web;
|
using Phantom.Common.Messages.Web;
|
||||||
using Phantom.Common.Messages.Web.BiDirectional;
|
using Phantom.Common.Messages.Web.BiDirectional;
|
||||||
using Phantom.Common.Messages.Web.ToWeb;
|
using Phantom.Common.Messages.Web.ToWeb;
|
||||||
using Phantom.Utils.Rpc;
|
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
using Phantom.Web.Services.Agents;
|
using Phantom.Web.Services.Agents;
|
||||||
using Phantom.Web.Services.Instances;
|
using Phantom.Web.Services.Instances;
|
||||||
|
@ -3,7 +3,7 @@ using NetMQ.Sockets;
|
|||||||
using Phantom.Common.Messages.Web;
|
using Phantom.Common.Messages.Web;
|
||||||
using Phantom.Common.Messages.Web.BiDirectional;
|
using Phantom.Common.Messages.Web.BiDirectional;
|
||||||
using Phantom.Common.Messages.Web.ToController;
|
using Phantom.Common.Messages.Web.ToController;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Phantom.Utils.Rpc.Sockets;
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
using ILogger = Serilog.ILogger;
|
using ILogger = Serilog.ILogger;
|
||||||
|
|
||||||
@ -16,7 +16,7 @@ public sealed class RpcClientRuntime : RpcClientRuntime<IMessageToWebListener, I
|
|||||||
|
|
||||||
private RpcClientRuntime(RpcClientSocket<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToWebListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
|
private RpcClientRuntime(RpcClientSocket<IMessageToWebListener, IMessageToControllerListener, ReplyMessage> socket, IMessageToWebListener messageListener, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, messageListener, disconnectSemaphore, receiveCancellationToken) {}
|
||||||
|
|
||||||
protected override async Task Disconnect(ClientSocket socket, ILogger logger) {
|
protected override async Task SendDisconnectMessage(ClientSocket socket, ILogger logger) {
|
||||||
var unregisterMessageBytes = WebMessageRegistries.ToController.Write(new UnregisterWebMessage()).ToArray();
|
var unregisterMessageBytes = WebMessageRegistries.ToController.Write(new UnregisterWebMessage()).ToArray();
|
||||||
try {
|
try {
|
||||||
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
|
await socket.SendAsync(unregisterMessageBytes).AsTask().WaitAsync(TimeSpan.FromSeconds(5), CancellationToken.None);
|
||||||
|
@ -48,7 +48,7 @@ try {
|
|||||||
|
|
||||||
var (controllerCertificate, webToken) = webKey.Value;
|
var (controllerCertificate, webToken) = webKey.Value;
|
||||||
|
|
||||||
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), controllerHost, controllerPort, controllerCertificate);
|
var rpcConfiguration = new RpcConfiguration("Rpc", controllerHost, controllerPort, controllerCertificate);
|
||||||
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken));
|
var rpcSocket = RpcClientSocket.Connect(rpcConfiguration, WebMessageRegistries.Definitions, new RegisterWebMessage(webToken));
|
||||||
|
|
||||||
var configuration = new Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
|
var configuration = new Configuration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, dataProtectionKeysPath, shutdownCancellationToken);
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
using Microsoft.AspNetCore.DataProtection;
|
using Microsoft.AspNetCore.DataProtection;
|
||||||
using Phantom.Common.Messages.Web;
|
using Phantom.Common.Messages.Web;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
using Phantom.Web.Base;
|
using Phantom.Web.Base;
|
||||||
using Phantom.Web.Services;
|
using Phantom.Web.Services;
|
||||||
|
Loading…
Reference in New Issue
Block a user