1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-09-16 09:24:49 +02:00

1 Commits

Author SHA1 Message Date
dcfe66a337 Move RPC message receiving to Akka.NET 2024-03-15 22:00:37 +01:00
8 changed files with 120 additions and 58 deletions

View File

@@ -17,6 +17,8 @@ using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services; namespace Phantom.Controller.Services;
public sealed class ControllerServices : IAsyncDisposable { public sealed class ControllerServices : IAsyncDisposable {
public ActorSystem ActorSystem { get; }
private TaskManager TaskManager { get; } private TaskManager TaskManager { get; }
private ControllerState ControllerState { get; } private ControllerState ControllerState { get; }
private MinecraftVersions MinecraftVersions { get; } private MinecraftVersions MinecraftVersions { get; }
@@ -33,7 +35,6 @@ public sealed class ControllerServices : IAsyncDisposable {
private UserLoginManager UserLoginManager { get; } private UserLoginManager UserLoginManager { get; }
private AuditLogManager AuditLogManager { get; } private AuditLogManager AuditLogManager { get; }
private readonly ActorSystem actorSystem;
private readonly IDbContextProvider dbProvider; private readonly IDbContextProvider dbProvider;
private readonly AuthToken webAuthToken; private readonly AuthToken webAuthToken;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
@@ -43,13 +44,13 @@ public sealed class ControllerServices : IAsyncDisposable {
this.webAuthToken = webAuthToken; this.webAuthToken = webAuthToken;
this.cancellationToken = shutdownCancellationToken; this.cancellationToken = shutdownCancellationToken;
this.actorSystem = ActorSystemFactory.Create("Controller"); this.ActorSystem = ActorSystemFactory.Create("Controller");
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>()); this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
this.ControllerState = new ControllerState(); this.ControllerState = new ControllerState();
this.MinecraftVersions = new MinecraftVersions(); this.MinecraftVersions = new MinecraftVersions();
this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken); this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.InstanceLogManager = new InstanceLogManager(); this.InstanceLogManager = new InstanceLogManager();
this.UserManager = new UserManager(dbProvider); this.UserManager = new UserManager(dbProvider);
@@ -67,7 +68,7 @@ public sealed class ControllerServices : IAsyncDisposable {
} }
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) { public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager); return new WebMessageListener(ActorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
} }
public async Task Initialize() { public async Task Initialize() {
@@ -78,7 +79,7 @@ public sealed class ControllerServices : IAsyncDisposable {
} }
public async ValueTask DisposeAsync() { public async ValueTask DisposeAsync() {
await actorSystem.Terminate(); await ActorSystem.Terminate();
actorSystem.Dispose(); ActorSystem.Dispose();
} }
} }

View File

@@ -55,23 +55,22 @@ try {
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString); var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) { await using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
await controllerServices.Initialize(); await controllerServices.Initialize();
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) { static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate); return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
} }
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc")); var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
try { try {
await Task.WhenAll( await Task.WhenAll(
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken), RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, controllerServices.ActorSystem, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken) RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, controllerServices.ActorSystem, shutdownCancellationToken)
); );
} finally { } finally {
await rpcTaskManager.Stop(); await rpcTaskManager.Stop();
NetMQConfig.Cleanup(); NetMQConfig.Cleanup();
}
} }
return 0; return 0;

View File

@@ -5,6 +5,7 @@ namespace Phantom.Utils.Actor;
public readonly struct ActorConfiguration { public readonly struct ActorConfiguration {
public SupervisorStrategy? SupervisorStrategy { get; init; } public SupervisorStrategy? SupervisorStrategy { get; init; }
public string? MailboxType { get; init; } public string? MailboxType { get; init; }
public int? StashCapacity { get; init; }
internal Props Apply(Props props) { internal Props Apply(Props props) {
if (SupervisorStrategy != null) { if (SupervisorStrategy != null) {
@@ -15,6 +16,10 @@ public readonly struct ActorConfiguration {
props = props.WithMailbox(MailboxType); props = props.WithMailbox(MailboxType);
} }
if (StashCapacity != null) {
props = props.WithStashCapacity(StashCapacity.Value);
}
return props; return props;
} }
} }

View File

@@ -28,4 +28,8 @@ public readonly struct ActorRef<TMessage> {
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) { public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
return Request(message, timeout: null, cancellationToken); return Request(message, timeout: null, cancellationToken);
} }
public Task<bool> Stop(TimeSpan? timeout = null) {
return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan);
}
} }

View File

@@ -12,6 +12,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" /> <ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" /> <ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup> </ItemGroup>

View File

@@ -2,6 +2,7 @@
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc;
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) { public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public string LoggerName => "Rpc:" + ServiceName;
public string TcpUrl => "tcp://" + Host + ":" + Port; public string TcpUrl => "tcp://" + Host + ":" + Port;
} }

View File

@@ -0,0 +1,64 @@
using Akka.Actor;
using Akka.Event;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc;
sealed class RpcReceiverActor<TClientListener, TServerListener, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand>, IWithStash where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
public readonly record struct Init(IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions, MessageHandler<TServerListener> MessageHandler, RpcConnectionToClient<TClientListener> Connection);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>(init), new ActorConfiguration {
SupervisorStrategy = SupervisorStrategies.Resume,
StashCapacity = 100
});
}
public IStash Stash { get; set; } = null!;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly MessageHandler<TServerListener> messageHandler;
private readonly RpcConnectionToClient<TClientListener> connection;
private RpcReceiverActor(Init init) {
this.messageDefinitions = init.MessageDefinitions;
this.messageHandler = init.MessageHandler;
this.connection = init.Connection;
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
}
public interface ICommand {}
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data) : ICommand;
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
if (messageDefinitions.IsRegistrationMessage(command.MessageType)) {
Handle(command.Data);
if (await connection.GetAuthorization()) {
Stash.UnstashAll();
Become(() => {
Receive<ReceiveMessageCommand>(ReceiveMessageAuthorized);
});
}
}
else if (Stash.IsFull) {
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
}
else {
Stash.Stash();
}
}
private void ReceiveMessageAuthorized(ReceiveMessageCommand command) {
Handle(command.Data);
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, messageHandler);
}
}

View File

@@ -1,5 +1,7 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using Akka.Actor;
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets; using Phantom.Utils.Rpc.Sockets;
@@ -9,25 +11,29 @@ using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime; namespace Phantom.Utils.Rpc.Runtime;
public static class RpcServerRuntime { public static class RpcServerRuntime {
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken); return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, actorSystem, cancellationToken);
} }
} }
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) { internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config); var socket = RpcServerSocket.Connect(config);
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch(); return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, actorSystem, cancellationToken).Launch();
} }
private readonly string serviceName;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory; private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly ActorSystem actorSystem;
private readonly TaskManager taskManager; private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) { private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) : base(socket) {
this.serviceName = socket.Config.ServiceName;
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory; this.listenerFactory = listenerFactory;
this.actorSystem = actorSystem;
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime")); this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
} }
@@ -62,18 +68,17 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
} }
var clientLoggerName = LoggerName + ":" + routingId; var clientLoggerName = LoggerName + ":" + routingId;
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker); var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
connection.Closed += OnConnectionClosed; connection.Closed += OnConnectionClosed;
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager); var clientActorName = "RpcReceive-" + serviceName + "-" + routingId;
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, listenerFactory(connection), taskManager);
clients[routingId] = client; clients[routingId] = client;
client.EnqueueRegistrationMessage(messageType, data);
}
else {
client.Enqueue(messageType, data);
} }
client.Enqueue(messageType, data);
} }
foreach (var client in clients.Values) { foreach (var client in clients.Values) {
@@ -94,27 +99,22 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
private sealed class Client : MessageHandler<TServerListener> { private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; } public RpcConnectionToClient<TClientListener> Connection { get; }
private readonly RpcQueue processingQueue; private readonly ActorRef<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand> receiverActor;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly TaskManager taskManager; private readonly TaskManager taskManager;
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) { public Client(string loggerName, string actorName, RpcConnectionToClient<TClientListener> connection, ActorSystem actorSystem, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
this.Connection = connection; this.Connection = connection;
this.Connection.Closed += OnConnectionClosed; this.Connection.Closed += OnConnectionClosed;
this.processingQueue = processingQueue; this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Factory(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Init(messageDefinitions, this, Connection)), actorName);
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
this.taskManager = taskManager; this.taskManager = taskManager;
} }
internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => Handle(data));
}
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) { internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data); LogMessageType(messageType, data);
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data)); receiverActor.Tell(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ReceiveMessageCommand(messageType, data));
} }
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) { private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
@@ -123,19 +123,6 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
} }
} }
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, this);
}
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
if (await Connection.GetAuthorization()) {
Handle(data);
}
else {
Logger.Warning("Dropped message after failed registration.");
}
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) { protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply)); return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
} }
@@ -147,7 +134,7 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
taskManager.Run("Closing connection to " + e.RoutingId, async () => { taskManager.Run("Closing connection to " + e.RoutingId, async () => {
await StopReceiving(); await StopReceiving();
await processingQueue.Stop(); await receiverActor.Stop();
await Connection.StopSending(); await Connection.StopSending();
Logger.Debug("Connection closed."); Logger.Debug("Connection closed.");
}); });