mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-09-16 09:24:49 +02:00
Compare commits
1 Commits
wip-forge
...
dcfe66a337
Author | SHA1 | Date | |
---|---|---|---|
dcfe66a337
|
@@ -17,6 +17,8 @@ using Phantom.Utils.Tasks;
|
||||
namespace Phantom.Controller.Services;
|
||||
|
||||
public sealed class ControllerServices : IAsyncDisposable {
|
||||
public ActorSystem ActorSystem { get; }
|
||||
|
||||
private TaskManager TaskManager { get; }
|
||||
private ControllerState ControllerState { get; }
|
||||
private MinecraftVersions MinecraftVersions { get; }
|
||||
@@ -33,7 +35,6 @@ public sealed class ControllerServices : IAsyncDisposable {
|
||||
private UserLoginManager UserLoginManager { get; }
|
||||
private AuditLogManager AuditLogManager { get; }
|
||||
|
||||
private readonly ActorSystem actorSystem;
|
||||
private readonly IDbContextProvider dbProvider;
|
||||
private readonly AuthToken webAuthToken;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
@@ -43,13 +44,13 @@ public sealed class ControllerServices : IAsyncDisposable {
|
||||
this.webAuthToken = webAuthToken;
|
||||
this.cancellationToken = shutdownCancellationToken;
|
||||
|
||||
this.actorSystem = ActorSystemFactory.Create("Controller");
|
||||
this.ActorSystem = ActorSystemFactory.Create("Controller");
|
||||
|
||||
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
|
||||
this.ControllerState = new ControllerState();
|
||||
this.MinecraftVersions = new MinecraftVersions();
|
||||
|
||||
this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
|
||||
this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
|
||||
this.InstanceLogManager = new InstanceLogManager();
|
||||
|
||||
this.UserManager = new UserManager(dbProvider);
|
||||
@@ -67,7 +68,7 @@ public sealed class ControllerServices : IAsyncDisposable {
|
||||
}
|
||||
|
||||
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
|
||||
return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
|
||||
return new WebMessageListener(ActorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
|
||||
}
|
||||
|
||||
public async Task Initialize() {
|
||||
@@ -78,7 +79,7 @@ public sealed class ControllerServices : IAsyncDisposable {
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync() {
|
||||
await actorSystem.Terminate();
|
||||
actorSystem.Dispose();
|
||||
await ActorSystem.Terminate();
|
||||
ActorSystem.Dispose();
|
||||
}
|
||||
}
|
||||
|
@@ -55,24 +55,23 @@ try {
|
||||
|
||||
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
|
||||
|
||||
await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) {
|
||||
await using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
|
||||
await controllerServices.Initialize();
|
||||
|
||||
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
|
||||
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
|
||||
return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
|
||||
}
|
||||
|
||||
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
|
||||
try {
|
||||
await Task.WhenAll(
|
||||
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
|
||||
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
|
||||
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, controllerServices.ActorSystem, shutdownCancellationToken),
|
||||
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, controllerServices.ActorSystem, shutdownCancellationToken)
|
||||
);
|
||||
} finally {
|
||||
await rpcTaskManager.Stop();
|
||||
NetMQConfig.Cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
} catch (OperationCanceledException) {
|
||||
|
@@ -5,6 +5,7 @@ namespace Phantom.Utils.Actor;
|
||||
public readonly struct ActorConfiguration {
|
||||
public SupervisorStrategy? SupervisorStrategy { get; init; }
|
||||
public string? MailboxType { get; init; }
|
||||
public int? StashCapacity { get; init; }
|
||||
|
||||
internal Props Apply(Props props) {
|
||||
if (SupervisorStrategy != null) {
|
||||
@@ -15,6 +16,10 @@ public readonly struct ActorConfiguration {
|
||||
props = props.WithMailbox(MailboxType);
|
||||
}
|
||||
|
||||
if (StashCapacity != null) {
|
||||
props = props.WithStashCapacity(StashCapacity.Value);
|
||||
}
|
||||
|
||||
return props;
|
||||
}
|
||||
}
|
||||
|
@@ -28,4 +28,8 @@ public readonly struct ActorRef<TMessage> {
|
||||
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
|
||||
return Request(message, timeout: null, cancellationToken);
|
||||
}
|
||||
|
||||
public Task<bool> Stop(TimeSpan? timeout = null) {
|
||||
return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan);
|
||||
}
|
||||
}
|
||||
|
@@ -12,6 +12,7 @@
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
|
||||
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
|
||||
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
|
||||
</ItemGroup>
|
||||
|
@@ -2,6 +2,7 @@
|
||||
|
||||
namespace Phantom.Utils.Rpc;
|
||||
|
||||
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
||||
public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
||||
public string LoggerName => "Rpc:" + ServiceName;
|
||||
public string TcpUrl => "tcp://" + Host + ":" + Port;
|
||||
}
|
||||
|
64
Utils/Phantom.Utils.Rpc/RpcReceiverActor.cs
Normal file
64
Utils/Phantom.Utils.Rpc/RpcReceiverActor.cs
Normal file
@@ -0,0 +1,64 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Event;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Utils.Rpc;
|
||||
|
||||
sealed class RpcReceiverActor<TClientListener, TServerListener, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand>, IWithStash where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||
public readonly record struct Init(IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions, MessageHandler<TServerListener> MessageHandler, RpcConnectionToClient<TClientListener> Connection);
|
||||
|
||||
public static Props<ICommand> Factory(Init init) {
|
||||
return Props<ICommand>.Create(() => new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>(init), new ActorConfiguration {
|
||||
SupervisorStrategy = SupervisorStrategies.Resume,
|
||||
StashCapacity = 100
|
||||
});
|
||||
}
|
||||
|
||||
public IStash Stash { get; set; } = null!;
|
||||
|
||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||
private readonly MessageHandler<TServerListener> messageHandler;
|
||||
private readonly RpcConnectionToClient<TClientListener> connection;
|
||||
|
||||
private RpcReceiverActor(Init init) {
|
||||
this.messageDefinitions = init.MessageDefinitions;
|
||||
this.messageHandler = init.MessageHandler;
|
||||
this.connection = init.Connection;
|
||||
|
||||
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
|
||||
}
|
||||
|
||||
public interface ICommand {}
|
||||
|
||||
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data) : ICommand;
|
||||
|
||||
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
|
||||
if (messageDefinitions.IsRegistrationMessage(command.MessageType)) {
|
||||
Handle(command.Data);
|
||||
|
||||
if (await connection.GetAuthorization()) {
|
||||
Stash.UnstashAll();
|
||||
|
||||
Become(() => {
|
||||
Receive<ReceiveMessageCommand>(ReceiveMessageAuthorized);
|
||||
});
|
||||
}
|
||||
}
|
||||
else if (Stash.IsFull) {
|
||||
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
|
||||
}
|
||||
else {
|
||||
Stash.Stash();
|
||||
}
|
||||
}
|
||||
|
||||
private void ReceiveMessageAuthorized(ReceiveMessageCommand command) {
|
||||
Handle(command.Data);
|
||||
}
|
||||
|
||||
private void Handle(ReadOnlyMemory<byte> data) {
|
||||
messageDefinitions.ToServer.Handle(data, messageHandler);
|
||||
}
|
||||
}
|
@@ -1,5 +1,7 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Akka.Actor;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Rpc.Sockets;
|
||||
@@ -9,25 +11,29 @@ using Serilog.Events;
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public static class RpcServerRuntime {
|
||||
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
|
||||
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, actorSystem, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
|
||||
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) {
|
||||
var socket = RpcServerSocket.Connect(config);
|
||||
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
|
||||
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, actorSystem, cancellationToken).Launch();
|
||||
}
|
||||
|
||||
private readonly string serviceName;
|
||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
|
||||
private readonly ActorSystem actorSystem;
|
||||
private readonly TaskManager taskManager;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
|
||||
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) : base(socket) {
|
||||
this.serviceName = socket.Config.ServiceName;
|
||||
this.messageDefinitions = messageDefinitions;
|
||||
this.listenerFactory = listenerFactory;
|
||||
this.actorSystem = actorSystem;
|
||||
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
|
||||
this.cancellationToken = cancellationToken;
|
||||
}
|
||||
@@ -62,19 +68,18 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
||||
}
|
||||
|
||||
var clientLoggerName = LoggerName + ":" + routingId;
|
||||
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
|
||||
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
|
||||
|
||||
connection.Closed += OnConnectionClosed;
|
||||
|
||||
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager);
|
||||
var clientActorName = "RpcReceive-" + serviceName + "-" + routingId;
|
||||
|
||||
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, listenerFactory(connection), taskManager);
|
||||
clients[routingId] = client;
|
||||
client.EnqueueRegistrationMessage(messageType, data);
|
||||
}
|
||||
else {
|
||||
|
||||
client.Enqueue(messageType, data);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var client in clients.Values) {
|
||||
client.Connection.Close();
|
||||
@@ -94,27 +99,22 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
||||
private sealed class Client : MessageHandler<TServerListener> {
|
||||
public RpcConnectionToClient<TClientListener> Connection { get; }
|
||||
|
||||
private readonly RpcQueue processingQueue;
|
||||
private readonly ActorRef<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand> receiverActor;
|
||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||
private readonly TaskManager taskManager;
|
||||
|
||||
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
|
||||
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientListener> connection, ActorSystem actorSystem, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
|
||||
this.Connection = connection;
|
||||
this.Connection.Closed += OnConnectionClosed;
|
||||
|
||||
this.processingQueue = processingQueue;
|
||||
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Factory(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Init(messageDefinitions, this, Connection)), actorName);
|
||||
this.messageDefinitions = messageDefinitions;
|
||||
this.taskManager = taskManager;
|
||||
}
|
||||
|
||||
internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) {
|
||||
LogMessageType(messageType, data);
|
||||
processingQueue.Enqueue(() => Handle(data));
|
||||
}
|
||||
|
||||
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
|
||||
LogMessageType(messageType, data);
|
||||
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data));
|
||||
receiverActor.Tell(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ReceiveMessageCommand(messageType, data));
|
||||
}
|
||||
|
||||
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
|
||||
@@ -123,19 +123,6 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
||||
}
|
||||
}
|
||||
|
||||
private void Handle(ReadOnlyMemory<byte> data) {
|
||||
messageDefinitions.ToServer.Handle(data, this);
|
||||
}
|
||||
|
||||
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
|
||||
if (await Connection.GetAuthorization()) {
|
||||
Handle(data);
|
||||
}
|
||||
else {
|
||||
Logger.Warning("Dropped message after failed registration.");
|
||||
}
|
||||
}
|
||||
|
||||
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
||||
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
|
||||
}
|
||||
@@ -147,7 +134,7 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
||||
|
||||
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
|
||||
await StopReceiving();
|
||||
await processingQueue.Stop();
|
||||
await receiverActor.Stop();
|
||||
await Connection.StopSending();
|
||||
Logger.Debug("Connection closed.");
|
||||
});
|
||||
|
Reference in New Issue
Block a user