1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-09-16 09:24:49 +02:00

1 Commits

Author SHA1 Message Date
dcfe66a337 Move RPC message receiving to Akka.NET 2024-03-15 22:00:37 +01:00
8 changed files with 120 additions and 58 deletions

View File

@@ -17,6 +17,8 @@ using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services; namespace Phantom.Controller.Services;
public sealed class ControllerServices : IAsyncDisposable { public sealed class ControllerServices : IAsyncDisposable {
public ActorSystem ActorSystem { get; }
private TaskManager TaskManager { get; } private TaskManager TaskManager { get; }
private ControllerState ControllerState { get; } private ControllerState ControllerState { get; }
private MinecraftVersions MinecraftVersions { get; } private MinecraftVersions MinecraftVersions { get; }
@@ -32,8 +34,7 @@ public sealed class ControllerServices : IAsyncDisposable {
private UserRoleManager UserRoleManager { get; } private UserRoleManager UserRoleManager { get; }
private UserLoginManager UserLoginManager { get; } private UserLoginManager UserLoginManager { get; }
private AuditLogManager AuditLogManager { get; } private AuditLogManager AuditLogManager { get; }
private readonly ActorSystem actorSystem;
private readonly IDbContextProvider dbProvider; private readonly IDbContextProvider dbProvider;
private readonly AuthToken webAuthToken; private readonly AuthToken webAuthToken;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
@@ -43,13 +44,13 @@ public sealed class ControllerServices : IAsyncDisposable {
this.webAuthToken = webAuthToken; this.webAuthToken = webAuthToken;
this.cancellationToken = shutdownCancellationToken; this.cancellationToken = shutdownCancellationToken;
this.actorSystem = ActorSystemFactory.Create("Controller"); this.ActorSystem = ActorSystemFactory.Create("Controller");
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>()); this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
this.ControllerState = new ControllerState(); this.ControllerState = new ControllerState();
this.MinecraftVersions = new MinecraftVersions(); this.MinecraftVersions = new MinecraftVersions();
this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken); this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.InstanceLogManager = new InstanceLogManager(); this.InstanceLogManager = new InstanceLogManager();
this.UserManager = new UserManager(dbProvider); this.UserManager = new UserManager(dbProvider);
@@ -67,7 +68,7 @@ public sealed class ControllerServices : IAsyncDisposable {
} }
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) { public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager); return new WebMessageListener(ActorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
} }
public async Task Initialize() { public async Task Initialize() {
@@ -78,7 +79,7 @@ public sealed class ControllerServices : IAsyncDisposable {
} }
public async ValueTask DisposeAsync() { public async ValueTask DisposeAsync() {
await actorSystem.Terminate(); await ActorSystem.Terminate();
actorSystem.Dispose(); ActorSystem.Dispose();
} }
} }

View File

@@ -54,24 +54,23 @@ try {
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server..."); PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString); var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) {
await controllerServices.Initialize();
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) { await using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate); await controllerServices.Initialize();
}
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc")); static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
try { return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
await Task.WhenAll( }
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken) var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
); try {
} finally { await Task.WhenAll(
await rpcTaskManager.Stop(); RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, controllerServices.ActorSystem, shutdownCancellationToken),
NetMQConfig.Cleanup(); RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, controllerServices.ActorSystem, shutdownCancellationToken)
} );
} finally {
await rpcTaskManager.Stop();
NetMQConfig.Cleanup();
} }
return 0; return 0;

View File

@@ -5,6 +5,7 @@ namespace Phantom.Utils.Actor;
public readonly struct ActorConfiguration { public readonly struct ActorConfiguration {
public SupervisorStrategy? SupervisorStrategy { get; init; } public SupervisorStrategy? SupervisorStrategy { get; init; }
public string? MailboxType { get; init; } public string? MailboxType { get; init; }
public int? StashCapacity { get; init; }
internal Props Apply(Props props) { internal Props Apply(Props props) {
if (SupervisorStrategy != null) { if (SupervisorStrategy != null) {
@@ -14,6 +15,10 @@ public readonly struct ActorConfiguration {
if (MailboxType != null) { if (MailboxType != null) {
props = props.WithMailbox(MailboxType); props = props.WithMailbox(MailboxType);
} }
if (StashCapacity != null) {
props = props.WithStashCapacity(StashCapacity.Value);
}
return props; return props;
} }

View File

@@ -28,4 +28,8 @@ public readonly struct ActorRef<TMessage> {
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) { public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
return Request(message, timeout: null, cancellationToken); return Request(message, timeout: null, cancellationToken);
} }
public Task<bool> Stop(TimeSpan? timeout = null) {
return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan);
}
} }

View File

@@ -12,6 +12,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" /> <ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" /> <ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup> </ItemGroup>

View File

@@ -2,6 +2,7 @@
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc;
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) { public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public string LoggerName => "Rpc:" + ServiceName;
public string TcpUrl => "tcp://" + Host + ":" + Port; public string TcpUrl => "tcp://" + Host + ":" + Port;
} }

View File

@@ -0,0 +1,64 @@
using Akka.Actor;
using Akka.Event;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc;
sealed class RpcReceiverActor<TClientListener, TServerListener, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand>, IWithStash where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
public readonly record struct Init(IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions, MessageHandler<TServerListener> MessageHandler, RpcConnectionToClient<TClientListener> Connection);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>(init), new ActorConfiguration {
SupervisorStrategy = SupervisorStrategies.Resume,
StashCapacity = 100
});
}
public IStash Stash { get; set; } = null!;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly MessageHandler<TServerListener> messageHandler;
private readonly RpcConnectionToClient<TClientListener> connection;
private RpcReceiverActor(Init init) {
this.messageDefinitions = init.MessageDefinitions;
this.messageHandler = init.MessageHandler;
this.connection = init.Connection;
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
}
public interface ICommand {}
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data) : ICommand;
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
if (messageDefinitions.IsRegistrationMessage(command.MessageType)) {
Handle(command.Data);
if (await connection.GetAuthorization()) {
Stash.UnstashAll();
Become(() => {
Receive<ReceiveMessageCommand>(ReceiveMessageAuthorized);
});
}
}
else if (Stash.IsFull) {
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
}
else {
Stash.Stash();
}
}
private void ReceiveMessageAuthorized(ReceiveMessageCommand command) {
Handle(command.Data);
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, messageHandler);
}
}

View File

@@ -1,5 +1,7 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using Akka.Actor;
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets; using Phantom.Utils.Rpc.Sockets;
@@ -9,25 +11,29 @@ using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime; namespace Phantom.Utils.Rpc.Runtime;
public static class RpcServerRuntime { public static class RpcServerRuntime {
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken); return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, actorSystem, cancellationToken);
} }
} }
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) { internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config); var socket = RpcServerSocket.Connect(config);
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch(); return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, actorSystem, cancellationToken).Launch();
} }
private readonly string serviceName;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory; private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly ActorSystem actorSystem;
private readonly TaskManager taskManager; private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) { private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) : base(socket) {
this.serviceName = socket.Config.ServiceName;
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory; this.listenerFactory = listenerFactory;
this.actorSystem = actorSystem;
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime")); this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
} }
@@ -62,18 +68,17 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
} }
var clientLoggerName = LoggerName + ":" + routingId; var clientLoggerName = LoggerName + ":" + routingId;
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker); var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
connection.Closed += OnConnectionClosed; connection.Closed += OnConnectionClosed;
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager); var clientActorName = "RpcReceive-" + serviceName + "-" + routingId;
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, listenerFactory(connection), taskManager);
clients[routingId] = client; clients[routingId] = client;
client.EnqueueRegistrationMessage(messageType, data);
}
else {
client.Enqueue(messageType, data);
} }
client.Enqueue(messageType, data);
} }
foreach (var client in clients.Values) { foreach (var client in clients.Values) {
@@ -94,27 +99,22 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
private sealed class Client : MessageHandler<TServerListener> { private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; } public RpcConnectionToClient<TClientListener> Connection { get; }
private readonly RpcQueue processingQueue; private readonly ActorRef<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand> receiverActor;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly TaskManager taskManager; private readonly TaskManager taskManager;
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) { public Client(string loggerName, string actorName, RpcConnectionToClient<TClientListener> connection, ActorSystem actorSystem, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
this.Connection = connection; this.Connection = connection;
this.Connection.Closed += OnConnectionClosed; this.Connection.Closed += OnConnectionClosed;
this.processingQueue = processingQueue; this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Factory(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Init(messageDefinitions, this, Connection)), actorName);
this.messageDefinitions = messageDefinitions; this.messageDefinitions = messageDefinitions;
this.taskManager = taskManager; this.taskManager = taskManager;
} }
internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => Handle(data));
}
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) { internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data); LogMessageType(messageType, data);
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data)); receiverActor.Tell(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ReceiveMessageCommand(messageType, data));
} }
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) { private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
@@ -123,19 +123,6 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
} }
} }
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, this);
}
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
if (await Connection.GetAuthorization()) {
Handle(data);
}
else {
Logger.Warning("Dropped message after failed registration.");
}
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) { protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply)); return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
} }
@@ -147,7 +134,7 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
taskManager.Run("Closing connection to " + e.RoutingId, async () => { taskManager.Run("Closing connection to " + e.RoutingId, async () => {
await StopReceiving(); await StopReceiving();
await processingQueue.Stop(); await receiverActor.Stop();
await Connection.StopSending(); await Connection.StopSending();
Logger.Debug("Connection closed."); Logger.Debug("Connection closed.");
}); });