1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-09-16 09:24:49 +02:00

1 Commits

Author SHA1 Message Date
dcfe66a337 Move RPC message receiving to Akka.NET 2024-03-15 22:00:37 +01:00
8 changed files with 120 additions and 58 deletions

View File

@@ -17,6 +17,8 @@ using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services;
public sealed class ControllerServices : IAsyncDisposable {
public ActorSystem ActorSystem { get; }
private TaskManager TaskManager { get; }
private ControllerState ControllerState { get; }
private MinecraftVersions MinecraftVersions { get; }
@@ -32,8 +34,7 @@ public sealed class ControllerServices : IAsyncDisposable {
private UserRoleManager UserRoleManager { get; }
private UserLoginManager UserLoginManager { get; }
private AuditLogManager AuditLogManager { get; }
private readonly ActorSystem actorSystem;
private readonly IDbContextProvider dbProvider;
private readonly AuthToken webAuthToken;
private readonly CancellationToken cancellationToken;
@@ -43,13 +44,13 @@ public sealed class ControllerServices : IAsyncDisposable {
this.webAuthToken = webAuthToken;
this.cancellationToken = shutdownCancellationToken;
this.actorSystem = ActorSystemFactory.Create("Controller");
this.ActorSystem = ActorSystemFactory.Create("Controller");
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
this.ControllerState = new ControllerState();
this.MinecraftVersions = new MinecraftVersions();
this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.InstanceLogManager = new InstanceLogManager();
this.UserManager = new UserManager(dbProvider);
@@ -67,7 +68,7 @@ public sealed class ControllerServices : IAsyncDisposable {
}
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
return new WebMessageListener(ActorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
}
public async Task Initialize() {
@@ -78,7 +79,7 @@ public sealed class ControllerServices : IAsyncDisposable {
}
public async ValueTask DisposeAsync() {
await actorSystem.Terminate();
actorSystem.Dispose();
await ActorSystem.Terminate();
ActorSystem.Dispose();
}
}

View File

@@ -54,24 +54,23 @@ try {
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) {
await controllerServices.Initialize();
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
}
await using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
await controllerServices.Initialize();
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
try {
await Task.WhenAll(
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
);
} finally {
await rpcTaskManager.Stop();
NetMQConfig.Cleanup();
}
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
}
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
try {
await Task.WhenAll(
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, controllerServices.ActorSystem, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, controllerServices.ActorSystem, shutdownCancellationToken)
);
} finally {
await rpcTaskManager.Stop();
NetMQConfig.Cleanup();
}
return 0;

View File

@@ -5,6 +5,7 @@ namespace Phantom.Utils.Actor;
public readonly struct ActorConfiguration {
public SupervisorStrategy? SupervisorStrategy { get; init; }
public string? MailboxType { get; init; }
public int? StashCapacity { get; init; }
internal Props Apply(Props props) {
if (SupervisorStrategy != null) {
@@ -14,6 +15,10 @@ public readonly struct ActorConfiguration {
if (MailboxType != null) {
props = props.WithMailbox(MailboxType);
}
if (StashCapacity != null) {
props = props.WithStashCapacity(StashCapacity.Value);
}
return props;
}

View File

@@ -28,4 +28,8 @@ public readonly struct ActorRef<TMessage> {
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
return Request(message, timeout: null, cancellationToken);
}
public Task<bool> Stop(TimeSpan? timeout = null) {
return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan);
}
}

View File

@@ -12,6 +12,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup>

View File

@@ -2,6 +2,7 @@
namespace Phantom.Utils.Rpc;
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public string LoggerName => "Rpc:" + ServiceName;
public string TcpUrl => "tcp://" + Host + ":" + Port;
}

View File

@@ -0,0 +1,64 @@
using Akka.Actor;
using Akka.Event;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc;
sealed class RpcReceiverActor<TClientListener, TServerListener, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand>, IWithStash where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
public readonly record struct Init(IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions, MessageHandler<TServerListener> MessageHandler, RpcConnectionToClient<TClientListener> Connection);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>(init), new ActorConfiguration {
SupervisorStrategy = SupervisorStrategies.Resume,
StashCapacity = 100
});
}
public IStash Stash { get; set; } = null!;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly MessageHandler<TServerListener> messageHandler;
private readonly RpcConnectionToClient<TClientListener> connection;
private RpcReceiverActor(Init init) {
this.messageDefinitions = init.MessageDefinitions;
this.messageHandler = init.MessageHandler;
this.connection = init.Connection;
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
}
public interface ICommand {}
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data) : ICommand;
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
if (messageDefinitions.IsRegistrationMessage(command.MessageType)) {
Handle(command.Data);
if (await connection.GetAuthorization()) {
Stash.UnstashAll();
Become(() => {
Receive<ReceiveMessageCommand>(ReceiveMessageAuthorized);
});
}
}
else if (Stash.IsFull) {
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
}
else {
Stash.Stash();
}
}
private void ReceiveMessageAuthorized(ReceiveMessageCommand command) {
Handle(command.Data);
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, messageHandler);
}
}

View File

@@ -1,5 +1,7 @@
using System.Collections.Concurrent;
using Akka.Actor;
using NetMQ.Sockets;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
@@ -9,25 +11,29 @@ using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime;
public static class RpcServerRuntime {
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, actorSystem, cancellationToken);
}
}
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config);
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, actorSystem, cancellationToken).Launch();
}
private readonly string serviceName;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly ActorSystem actorSystem;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) : base(socket) {
this.serviceName = socket.Config.ServiceName;
this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory;
this.actorSystem = actorSystem;
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
this.cancellationToken = cancellationToken;
}
@@ -62,18 +68,17 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
}
var clientLoggerName = LoggerName + ":" + routingId;
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager);
var clientActorName = "RpcReceive-" + serviceName + "-" + routingId;
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, listenerFactory(connection), taskManager);
clients[routingId] = client;
client.EnqueueRegistrationMessage(messageType, data);
}
else {
client.Enqueue(messageType, data);
}
client.Enqueue(messageType, data);
}
foreach (var client in clients.Values) {
@@ -94,27 +99,22 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; }
private readonly RpcQueue processingQueue;
private readonly ActorRef<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand> receiverActor;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly TaskManager taskManager;
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientListener> connection, ActorSystem actorSystem, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
this.Connection = connection;
this.Connection.Closed += OnConnectionClosed;
this.processingQueue = processingQueue;
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Factory(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Init(messageDefinitions, this, Connection)), actorName);
this.messageDefinitions = messageDefinitions;
this.taskManager = taskManager;
}
internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => Handle(data));
}
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data));
receiverActor.Tell(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ReceiveMessageCommand(messageType, data));
}
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
@@ -123,19 +123,6 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
}
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, this);
}
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
if (await Connection.GetAuthorization()) {
Handle(data);
}
else {
Logger.Warning("Dropped message after failed registration.");
}
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
}
@@ -147,7 +134,7 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
await StopReceiving();
await processingQueue.Stop();
await receiverActor.Stop();
await Connection.StopSending();
Logger.Debug("Connection closed.");
});