mirror of
				https://github.com/chylex/Minecraft-Phantom-Panel.git
				synced 2025-10-24 20:23:39 +02:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			main
			...
			dcfe66a337
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| dcfe66a337 | 
| @@ -17,6 +17,8 @@ using Phantom.Utils.Tasks; | ||||
| namespace Phantom.Controller.Services; | ||||
|  | ||||
| public sealed class ControllerServices : IAsyncDisposable { | ||||
| 	public ActorSystem ActorSystem { get; } | ||||
| 	 | ||||
| 	private TaskManager TaskManager { get; } | ||||
| 	private ControllerState ControllerState { get; } | ||||
| 	private MinecraftVersions MinecraftVersions { get; } | ||||
| @@ -32,8 +34,7 @@ public sealed class ControllerServices : IAsyncDisposable { | ||||
| 	private UserRoleManager UserRoleManager { get; } | ||||
| 	private UserLoginManager UserLoginManager { get; } | ||||
| 	private AuditLogManager AuditLogManager { get; } | ||||
| 	 | ||||
| 	private readonly ActorSystem actorSystem; | ||||
|  | ||||
| 	private readonly IDbContextProvider dbProvider; | ||||
| 	private readonly AuthToken webAuthToken; | ||||
| 	private readonly CancellationToken cancellationToken; | ||||
| @@ -43,13 +44,13 @@ public sealed class ControllerServices : IAsyncDisposable { | ||||
| 		this.webAuthToken = webAuthToken; | ||||
| 		this.cancellationToken = shutdownCancellationToken; | ||||
| 		 | ||||
| 		this.actorSystem = ActorSystemFactory.Create("Controller"); | ||||
| 		this.ActorSystem = ActorSystemFactory.Create("Controller"); | ||||
|  | ||||
| 		this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>()); | ||||
| 		this.ControllerState = new ControllerState(); | ||||
| 		this.MinecraftVersions = new MinecraftVersions(); | ||||
| 		 | ||||
| 		this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken); | ||||
| 		this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken); | ||||
| 		this.InstanceLogManager = new InstanceLogManager(); | ||||
| 		 | ||||
| 		this.UserManager = new UserManager(dbProvider); | ||||
| @@ -67,7 +68,7 @@ public sealed class ControllerServices : IAsyncDisposable { | ||||
| 	} | ||||
|  | ||||
| 	public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) { | ||||
| 		return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager); | ||||
| 		return new WebMessageListener(ActorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager); | ||||
| 	} | ||||
|  | ||||
| 	public async Task Initialize() { | ||||
| @@ -78,7 +79,7 @@ public sealed class ControllerServices : IAsyncDisposable { | ||||
| 	} | ||||
|  | ||||
| 	public async ValueTask DisposeAsync() { | ||||
| 		await actorSystem.Terminate(); | ||||
| 		actorSystem.Dispose(); | ||||
| 		await ActorSystem.Terminate(); | ||||
| 		ActorSystem.Dispose(); | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -54,24 +54,23 @@ try { | ||||
| 	PhantomLogger.Root.InformationHeading("Launching Phantom Panel server..."); | ||||
| 	 | ||||
| 	var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString); | ||||
| 	 | ||||
| 	await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) { | ||||
| 		await controllerServices.Initialize(); | ||||
|  | ||||
| 		static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) { | ||||
| 			return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate); | ||||
| 		} | ||||
| 	await using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken); | ||||
| 	await controllerServices.Initialize(); | ||||
|  | ||||
| 		var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc")); | ||||
| 		try { | ||||
| 			await Task.WhenAll( | ||||
| 				RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken), | ||||
| 				RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken) | ||||
| 			); | ||||
| 		} finally { | ||||
| 			await rpcTaskManager.Stop(); | ||||
| 			NetMQConfig.Cleanup(); | ||||
| 		} | ||||
| 	static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) { | ||||
| 		return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate); | ||||
| 	} | ||||
|  | ||||
| 	var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc")); | ||||
| 	try { | ||||
| 		await Task.WhenAll( | ||||
| 			RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, controllerServices.ActorSystem, shutdownCancellationToken), | ||||
| 			RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, controllerServices.ActorSystem, shutdownCancellationToken) | ||||
| 		); | ||||
| 	} finally { | ||||
| 		await rpcTaskManager.Stop(); | ||||
| 		NetMQConfig.Cleanup(); | ||||
| 	} | ||||
|  | ||||
| 	return 0; | ||||
|   | ||||
| @@ -5,6 +5,7 @@ namespace Phantom.Utils.Actor; | ||||
| public readonly struct ActorConfiguration { | ||||
| 	public SupervisorStrategy? SupervisorStrategy { get; init; } | ||||
| 	public string? MailboxType { get; init; } | ||||
| 	public int? StashCapacity { get; init; } | ||||
|  | ||||
| 	internal Props Apply(Props props) { | ||||
| 		if (SupervisorStrategy != null) { | ||||
| @@ -14,6 +15,10 @@ public readonly struct ActorConfiguration { | ||||
| 		if (MailboxType != null) { | ||||
| 			props = props.WithMailbox(MailboxType); | ||||
| 		} | ||||
|  | ||||
| 		if (StashCapacity != null) { | ||||
| 			props = props.WithStashCapacity(StashCapacity.Value); | ||||
| 		} | ||||
| 		 | ||||
| 		return props; | ||||
| 	} | ||||
|   | ||||
| @@ -28,4 +28,8 @@ public readonly struct ActorRef<TMessage> { | ||||
| 	public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) { | ||||
| 		return Request(message, timeout: null, cancellationToken); | ||||
| 	} | ||||
|  | ||||
| 	public Task<bool> Stop(TimeSpan? timeout = null) { | ||||
| 		return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan); | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -12,6 +12,7 @@ | ||||
|   </ItemGroup> | ||||
|    | ||||
|   <ItemGroup> | ||||
|     <ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" /> | ||||
|     <ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" /> | ||||
|     <ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" /> | ||||
|   </ItemGroup> | ||||
|   | ||||
| @@ -2,6 +2,7 @@ | ||||
|  | ||||
| namespace Phantom.Utils.Rpc; | ||||
|  | ||||
| public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) { | ||||
| public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) { | ||||
| 	public string LoggerName => "Rpc:" + ServiceName; | ||||
| 	public string TcpUrl => "tcp://" + Host + ":" + Port; | ||||
| } | ||||
|   | ||||
							
								
								
									
										64
									
								
								Utils/Phantom.Utils.Rpc/RpcReceiverActor.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										64
									
								
								Utils/Phantom.Utils.Rpc/RpcReceiverActor.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,64 @@ | ||||
| using Akka.Actor; | ||||
| using Akka.Event; | ||||
| using Phantom.Utils.Actor; | ||||
| using Phantom.Utils.Rpc.Message; | ||||
| using Phantom.Utils.Rpc.Runtime; | ||||
|  | ||||
| namespace Phantom.Utils.Rpc; | ||||
|  | ||||
| sealed class RpcReceiverActor<TClientListener, TServerListener, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand>, IWithStash where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { | ||||
| 	public readonly record struct Init(IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions, MessageHandler<TServerListener> MessageHandler, RpcConnectionToClient<TClientListener> Connection); | ||||
| 	 | ||||
| 	public static Props<ICommand> Factory(Init init) { | ||||
| 		return Props<ICommand>.Create(() => new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>(init), new ActorConfiguration { | ||||
| 			SupervisorStrategy = SupervisorStrategies.Resume, | ||||
| 			StashCapacity = 100 | ||||
| 		}); | ||||
| 	} | ||||
|  | ||||
| 	public IStash Stash { get; set; } = null!; | ||||
| 	 | ||||
| 	private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; | ||||
| 	private readonly MessageHandler<TServerListener> messageHandler; | ||||
| 	private readonly RpcConnectionToClient<TClientListener> connection; | ||||
| 	 | ||||
| 	private RpcReceiverActor(Init init) { | ||||
| 		this.messageDefinitions = init.MessageDefinitions; | ||||
| 		this.messageHandler = init.MessageHandler; | ||||
| 		this.connection = init.Connection; | ||||
| 		 | ||||
| 		ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized); | ||||
| 	} | ||||
|  | ||||
| 	public interface ICommand {} | ||||
| 	 | ||||
| 	public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data) : ICommand; | ||||
|  | ||||
| 	private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) { | ||||
| 		if (messageDefinitions.IsRegistrationMessage(command.MessageType)) { | ||||
| 			Handle(command.Data); | ||||
|  | ||||
| 			if (await connection.GetAuthorization()) { | ||||
| 				Stash.UnstashAll(); | ||||
| 				 | ||||
| 				Become(() => { | ||||
| 					Receive<ReceiveMessageCommand>(ReceiveMessageAuthorized); | ||||
| 				}); | ||||
| 			} | ||||
| 		} | ||||
| 		else if (Stash.IsFull) { | ||||
| 			Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType); | ||||
| 		} | ||||
| 		else { | ||||
| 			Stash.Stash(); | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	private void ReceiveMessageAuthorized(ReceiveMessageCommand command) { | ||||
| 		Handle(command.Data); | ||||
| 	} | ||||
| 	 | ||||
| 	private void Handle(ReadOnlyMemory<byte> data) { | ||||
| 		messageDefinitions.ToServer.Handle(data, messageHandler); | ||||
| 	} | ||||
| } | ||||
| @@ -1,5 +1,7 @@ | ||||
| using System.Collections.Concurrent; | ||||
| using Akka.Actor; | ||||
| using NetMQ.Sockets; | ||||
| using Phantom.Utils.Actor; | ||||
| using Phantom.Utils.Logging; | ||||
| using Phantom.Utils.Rpc.Message; | ||||
| using Phantom.Utils.Rpc.Sockets; | ||||
| @@ -9,25 +11,29 @@ using Serilog.Events; | ||||
| namespace Phantom.Utils.Rpc.Runtime; | ||||
|  | ||||
| public static class RpcServerRuntime { | ||||
| 	public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { | ||||
| 		return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken); | ||||
| 	public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { | ||||
| 		return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, actorSystem, cancellationToken); | ||||
| 	} | ||||
| } | ||||
|  | ||||
| internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> { | ||||
| 	internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) { | ||||
| 	internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) { | ||||
| 		var socket = RpcServerSocket.Connect(config); | ||||
| 		return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch(); | ||||
| 		return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, actorSystem, cancellationToken).Launch(); | ||||
| 	} | ||||
|  | ||||
| 	private readonly string serviceName; | ||||
| 	private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; | ||||
| 	private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory; | ||||
| 	private readonly ActorSystem actorSystem; | ||||
| 	private readonly TaskManager taskManager; | ||||
| 	private readonly CancellationToken cancellationToken; | ||||
|  | ||||
| 	private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) { | ||||
| 	private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) : base(socket) { | ||||
| 		this.serviceName = socket.Config.ServiceName; | ||||
| 		this.messageDefinitions = messageDefinitions; | ||||
| 		this.listenerFactory = listenerFactory; | ||||
| 		this.actorSystem = actorSystem; | ||||
| 		this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime")); | ||||
| 		this.cancellationToken = cancellationToken; | ||||
| 	} | ||||
| @@ -62,18 +68,17 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM | ||||
| 				} | ||||
| 				 | ||||
| 				var clientLoggerName = LoggerName + ":" + routingId; | ||||
| 				var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId); | ||||
| 				var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker); | ||||
| 				 | ||||
| 				connection.Closed += OnConnectionClosed; | ||||
|  | ||||
| 				client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager); | ||||
| 				var clientActorName = "RpcReceive-" + serviceName + "-" + routingId; | ||||
| 				 | ||||
| 				client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, listenerFactory(connection), taskManager); | ||||
| 				clients[routingId] = client; | ||||
| 				client.EnqueueRegistrationMessage(messageType, data); | ||||
| 			} | ||||
| 			else { | ||||
| 				client.Enqueue(messageType, data); | ||||
| 			} | ||||
| 			 | ||||
| 			client.Enqueue(messageType, data); | ||||
| 		} | ||||
|  | ||||
| 		foreach (var client in clients.Values) { | ||||
| @@ -94,27 +99,22 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM | ||||
| 	private sealed class Client : MessageHandler<TServerListener> { | ||||
| 		public RpcConnectionToClient<TClientListener> Connection { get; } | ||||
| 		 | ||||
| 		private readonly RpcQueue processingQueue; | ||||
| 		private readonly ActorRef<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand> receiverActor; | ||||
| 		private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions; | ||||
| 		private readonly TaskManager taskManager; | ||||
| 		 | ||||
| 		public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) { | ||||
| 		public Client(string loggerName, string actorName, RpcConnectionToClient<TClientListener> connection, ActorSystem actorSystem, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) { | ||||
| 			this.Connection = connection; | ||||
| 			this.Connection.Closed += OnConnectionClosed; | ||||
| 			 | ||||
| 			this.processingQueue = processingQueue; | ||||
| 			this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Factory(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Init(messageDefinitions, this, Connection)), actorName); | ||||
| 			this.messageDefinitions = messageDefinitions; | ||||
| 			this.taskManager = taskManager; | ||||
| 		} | ||||
|  | ||||
| 		internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) { | ||||
| 			LogMessageType(messageType, data); | ||||
| 			processingQueue.Enqueue(() => Handle(data)); | ||||
| 		} | ||||
| 		 | ||||
| 		internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) { | ||||
| 			LogMessageType(messageType, data); | ||||
| 			processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data)); | ||||
| 			receiverActor.Tell(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ReceiveMessageCommand(messageType, data)); | ||||
| 		} | ||||
|  | ||||
| 		private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) { | ||||
| @@ -123,19 +123,6 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		private void Handle(ReadOnlyMemory<byte> data) { | ||||
| 			messageDefinitions.ToServer.Handle(data, this); | ||||
| 		} | ||||
|  | ||||
| 		private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) { | ||||
| 			if (await Connection.GetAuthorization()) { | ||||
| 				Handle(data); | ||||
| 			} | ||||
| 			else { | ||||
| 				Logger.Warning("Dropped message after failed registration."); | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		protected override Task SendReply(uint sequenceId, byte[] serializedReply) { | ||||
| 			return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply)); | ||||
| 		} | ||||
| @@ -147,7 +134,7 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM | ||||
| 			 | ||||
| 			taskManager.Run("Closing connection to " + e.RoutingId, async () => { | ||||
| 				await StopReceiving(); | ||||
| 				await processingQueue.Stop(); | ||||
| 				await receiverActor.Stop(); | ||||
| 				await Connection.StopSending(); | ||||
| 				Logger.Debug("Connection closed."); | ||||
| 			}); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user