mirror of
				https://github.com/chylex/Minecraft-Phantom-Panel.git
				synced 2025-10-31 20:17:16 +01:00 
			
		
		
		
	Compare commits
	
		
			2 Commits
		
	
	
		
			3c10e1a8f9
			...
			d2e7f4f876
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| d2e7f4f876 | |||
| c4cf45776d | 
| @@ -7,7 +7,7 @@ using Serilog; | |||||||
| namespace Phantom.Agent.Minecraft.Java; | namespace Phantom.Agent.Minecraft.Java; | ||||||
|  |  | ||||||
| public sealed class JavaRuntimeDiscovery { | public sealed class JavaRuntimeDiscovery { | ||||||
| 	private static readonly ILogger Logger = PhantomLogger.Create(typeof(JavaRuntimeDiscovery)); | 	private static readonly ILogger Logger = PhantomLogger.Create(nameof(JavaRuntimeDiscovery)); | ||||||
|  |  | ||||||
| 	public static string? GetSystemSearchPath() { | 	public static string? GetSystemSearchPath() { | ||||||
| 		const string LinuxJavaPath = "/usr/lib/jvm"; | 		const string LinuxJavaPath = "/usr/lib/jvm"; | ||||||
|   | |||||||
| @@ -31,7 +31,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> { | |||||||
| 	private readonly SemaphoreSlim disconnectSemaphore; | 	private readonly SemaphoreSlim disconnectSemaphore; | ||||||
| 	private readonly CancellationToken receiveCancellationToken; | 	private readonly CancellationToken receiveCancellationToken; | ||||||
|  |  | ||||||
| 	private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<RpcServerConnection, IMessageToAgentListener> messageListenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, config.Logger) { | 	private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<RpcServerConnection, IMessageToAgentListener> messageListenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, socket) { | ||||||
| 		this.config = config; | 		this.config = config; | ||||||
| 		this.agentGuid = agentGuid; | 		this.agentGuid = agentGuid; | ||||||
| 		this.messageListenerFactory = messageListenerFactory; | 		this.messageListenerFactory = messageListenerFactory; | ||||||
| @@ -40,7 +40,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	protected override void Connect(ClientSocket socket) { | 	protected override void Connect(ClientSocket socket) { | ||||||
| 		var logger = config.Logger; | 		var logger = config.RuntimeLogger; | ||||||
| 		var url = config.TcpUrl; | 		var url = config.TcpUrl; | ||||||
|  |  | ||||||
| 		logger.Information("Starting ZeroMQ client and connecting to {Url}...", url); | 		logger.Information("Starting ZeroMQ client and connecting to {Url}...", url); | ||||||
| @@ -52,7 +52,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> { | |||||||
| 		var connection = new RpcServerConnection(socket, replyTracker); | 		var connection = new RpcServerConnection(socket, replyTracker); | ||||||
| 		ServerMessaging.SetCurrentConnection(connection); | 		ServerMessaging.SetCurrentConnection(connection); | ||||||
| 		 | 		 | ||||||
| 		var logger = config.Logger; | 		var logger = config.RuntimeLogger; | ||||||
| 		var handler = new MessageToAgentHandler(messageListenerFactory(connection), logger, taskManager, receiveCancellationToken); | 		var handler = new MessageToAgentHandler(messageListenerFactory(connection), logger, taskManager, receiveCancellationToken); | ||||||
| 		var keepAliveLoop = new KeepAliveLoop(connection); | 		var keepAliveLoop = new KeepAliveLoop(connection); | ||||||
|  |  | ||||||
| @@ -93,7 +93,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> { | |||||||
| 		var unregisterTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), CancellationToken.None); | 		var unregisterTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), CancellationToken.None); | ||||||
| 		var finishedTask = await Task.WhenAny(ServerMessaging.Send(new UnregisterAgentMessage(agentGuid)), unregisterTimeoutTask); | 		var finishedTask = await Task.WhenAny(ServerMessaging.Send(new UnregisterAgentMessage(agentGuid)), unregisterTimeoutTask); | ||||||
| 		if (finishedTask == unregisterTimeoutTask) { | 		if (finishedTask == unregisterTimeoutTask) { | ||||||
| 			config.Logger.Error("Timed out communicating agent shutdown with the server."); | 			config.RuntimeLogger.Error("Timed out communicating agent shutdown with the server."); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -5,7 +5,7 @@ using Serilog; | |||||||
| namespace Phantom.Agent.Rpc; | namespace Phantom.Agent.Rpc; | ||||||
|  |  | ||||||
| public static class ServerMessaging { | public static class ServerMessaging { | ||||||
| 	private static readonly ILogger Logger = PhantomLogger.Create(typeof(ServerMessaging)); | 	private static readonly ILogger Logger = PhantomLogger.Create(nameof(ServerMessaging)); | ||||||
| 	 | 	 | ||||||
| 	private static RpcServerConnection? CurrentConnection { get; set; } | 	private static RpcServerConnection? CurrentConnection { get; set; } | ||||||
| 	private static RpcServerConnection CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready."); | 	private static RpcServerConnection CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready."); | ||||||
|   | |||||||
| @@ -18,7 +18,7 @@ public sealed class AgentServices { | |||||||
|  |  | ||||||
| 	public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders) { | 	public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders) { | ||||||
| 		this.AgentFolders = agentFolders; | 		this.AgentFolders = agentFolders; | ||||||
| 		this.TaskManager = new TaskManager(); | 		this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>()); | ||||||
| 		this.JavaRuntimeRepository = new JavaRuntimeRepository(); | 		this.JavaRuntimeRepository = new JavaRuntimeRepository(); | ||||||
| 		this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager); | 		this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager); | ||||||
| 	} | 	} | ||||||
| @@ -30,10 +30,9 @@ public sealed class AgentServices { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public async Task Shutdown() { | 	public async Task Shutdown() { | ||||||
| 		Logger.Information("Stopping instances..."); | 		Logger.Information("Stopping services..."); | ||||||
| 		await InstanceSessionManager.StopAll(); |  | ||||||
| 		 | 		 | ||||||
| 		Logger.Information("Stopping task manager..."); | 		await InstanceSessionManager.StopAll(); | ||||||
| 		await TaskManager.Stop(); | 		await TaskManager.Stop(); | ||||||
| 		 | 		 | ||||||
| 		Logger.Information("Services stopped."); | 		Logger.Information("Services stopped."); | ||||||
|   | |||||||
| @@ -143,7 +143,7 @@ sealed class Instance : IDisposable { | |||||||
| 		public override void ReportStatus(IInstanceStatus newStatus) { | 		public override void ReportStatus(IInstanceStatus newStatus) { | ||||||
| 			int myStatusUpdateCounter = Interlocked.Increment(ref statusUpdateCounter); | 			int myStatusUpdateCounter = Interlocked.Increment(ref statusUpdateCounter); | ||||||
| 			 | 			 | ||||||
| 			instance.launchServices.TaskManager.Run(async () => { | 			instance.launchServices.TaskManager.Run("Report status of instance " + instance.shortName + " as " + newStatus.GetType().Name, async () => { | ||||||
| 				if (myStatusUpdateCounter == statusUpdateCounter) { | 				if (myStatusUpdateCounter == statusUpdateCounter) { | ||||||
| 					instance.currentStatus = newStatus; | 					instance.currentStatus = newStatus; | ||||||
| 					await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, newStatus)); | 					await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, newStatus)); | ||||||
|   | |||||||
| @@ -24,7 +24,7 @@ sealed class InstanceLogSender { | |||||||
| 		this.logger = PhantomLogger.Create<InstanceLogSender>(name); | 		this.logger = PhantomLogger.Create<InstanceLogSender>(name); | ||||||
| 		this.cancellationTokenSource = new CancellationTokenSource(); | 		this.cancellationTokenSource = new CancellationTokenSource(); | ||||||
| 		this.cancellationToken = cancellationTokenSource.Token; | 		this.cancellationToken = cancellationTokenSource.Token; | ||||||
| 		taskManager.Run(Run); | 		taskManager.Run("Instance log sender for " + name, Run); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	private async Task Run() { | 	private async Task Run() { | ||||||
|   | |||||||
| @@ -132,6 +132,8 @@ sealed class InstanceSessionManager : IDisposable { | |||||||
|  |  | ||||||
| 	public async Task StopAll() { | 	public async Task StopAll() { | ||||||
| 		shutdownCancellationTokenSource.Cancel(); | 		shutdownCancellationTokenSource.Cancel(); | ||||||
|  | 		 | ||||||
|  | 		Logger.Information("Stopping all instances..."); | ||||||
|  |  | ||||||
| 		await semaphore.WaitAsync(CancellationToken.None); | 		await semaphore.WaitAsync(CancellationToken.None); | ||||||
| 		try { | 		try { | ||||||
|   | |||||||
| @@ -19,7 +19,7 @@ sealed class InstanceLaunchingState : IInstanceState, IDisposable { | |||||||
| 	public void Initialize() { | 	public void Initialize() { | ||||||
| 		context.Logger.Information("Session starting..."); | 		context.Logger.Information("Session starting..."); | ||||||
|  |  | ||||||
| 		var launchTask = context.LaunchServices.TaskManager.Run(DoLaunch); | 		var launchTask = context.LaunchServices.TaskManager.Run("Launch procedure for instance " + context.ShortName, DoLaunch); | ||||||
| 		launchTask.ContinueWith(OnLaunchSuccess, CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default); | 		launchTask.ContinueWith(OnLaunchSuccess, CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default); | ||||||
| 		launchTask.ContinueWith(OnLaunchFailure, CancellationToken.None, TaskContinuationOptions.NotOnRanToCompletion, TaskScheduler.Default); | 		launchTask.ContinueWith(OnLaunchFailure, CancellationToken.None, TaskContinuationOptions.NotOnRanToCompletion, TaskScheduler.Default); | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -30,7 +30,7 @@ sealed class InstanceRunningState : IInstanceState { | |||||||
| 		if (session.HasEnded) { | 		if (session.HasEnded) { | ||||||
| 			if (sessionObjects.Dispose()) { | 			if (sessionObjects.Dispose()) { | ||||||
| 				context.Logger.Warning("Session ended immediately after it was started."); | 				context.Logger.Warning("Session ended immediately after it was started."); | ||||||
| 				context.LaunchServices.TaskManager.Run(() => context.TransitionState(new InstanceNotRunningState(), InstanceStatus.Failed(InstanceLaunchFailReason.UnknownError))); | 				context.LaunchServices.TaskManager.Run("Transition state of instance " + context.ShortName + " to not running", () => context.TransitionState(new InstanceNotRunningState(), InstanceStatus.Failed(InstanceLaunchFailReason.UnknownError))); | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		else { | 		else { | ||||||
| @@ -75,7 +75,7 @@ sealed class InstanceRunningState : IInstanceState { | |||||||
| 		} | 		} | ||||||
| 		 | 		 | ||||||
| 		isStopping = true; | 		isStopping = true; | ||||||
| 		context.LaunchServices.TaskManager.Run(() => StopLater(stopStrategy.Seconds)); | 		context.LaunchServices.TaskManager.Run("Delayed stop timer for instance " + context.ShortName, () => StopLater(stopStrategy.Seconds)); | ||||||
| 		return (this, StopInstanceResult.StopInitiated); | 		return (this, StopInstanceResult.StopInitiated); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -21,7 +21,7 @@ sealed class InstanceStoppingState : IInstanceState, IDisposable { | |||||||
| 	public void Initialize() { | 	public void Initialize() { | ||||||
| 		context.Logger.Information("Session stopping."); | 		context.Logger.Information("Session stopping."); | ||||||
| 		context.ReportStatus(InstanceStatus.Stopping); | 		context.ReportStatus(InstanceStatus.Stopping); | ||||||
| 		context.LaunchServices.TaskManager.Run(DoStop); | 		context.LaunchServices.TaskManager.Run("Stop procedure for instance " + context.ShortName, DoStop); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	private async Task DoStop() { | 	private async Task DoStop() { | ||||||
|   | |||||||
| @@ -8,7 +8,7 @@ using Serilog; | |||||||
| namespace Phantom.Agent; | namespace Phantom.Agent; | ||||||
|  |  | ||||||
| static class AgentKey { | static class AgentKey { | ||||||
| 	private static ILogger Logger { get; } = PhantomLogger.Create(typeof(AgentKey)); | 	private static ILogger Logger { get; } = PhantomLogger.Create(nameof(AgentKey)); | ||||||
|  |  | ||||||
| 	public static Task<(NetMQCertificate, AgentAuthToken)?> Load(string? agentKeyToken, string? agentKeyFilePath) { | 	public static Task<(NetMQCertificate, AgentAuthToken)?> Load(string? agentKeyToken, string? agentKeyFilePath) { | ||||||
| 		if (agentKeyFilePath != null) { | 		if (agentKeyFilePath != null) { | ||||||
|   | |||||||
| @@ -6,7 +6,7 @@ using Serilog; | |||||||
| namespace Phantom.Agent;  | namespace Phantom.Agent;  | ||||||
|  |  | ||||||
| static class GuidFile { | static class GuidFile { | ||||||
| 	private static ILogger Logger { get; } = PhantomLogger.Create(typeof(GuidFile)); | 	private static ILogger Logger { get; } = PhantomLogger.Create(nameof(GuidFile)); | ||||||
|  |  | ||||||
| 	private const string GuidFileName = "agent.guid"; | 	private const string GuidFileName = "agent.guid"; | ||||||
| 	 | 	 | ||||||
|   | |||||||
| @@ -53,7 +53,8 @@ try { | |||||||
| 	await agentServices.Initialize(); | 	await agentServices.Initialize(); | ||||||
|  |  | ||||||
| 	var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1); | 	var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1); | ||||||
| 	var rpcTask = RpcLauncher.Launch(new RpcConfiguration(PhantomLogger.Create("Rpc"), serverHost, serverPort, serverCertificate), agentToken, agentInfo, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken); | 	var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), serverHost, serverPort, serverCertificate); | ||||||
|  | 	var rpcTask = RpcLauncher.Launch(rpcConfiguration, agentToken, agentInfo, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken); | ||||||
| 	try { | 	try { | ||||||
| 		await rpcTask.WaitAsync(shutdownCancellationToken); | 		await rpcTask.WaitAsync(shutdownCancellationToken); | ||||||
| 	} finally { | 	} finally { | ||||||
|   | |||||||
| @@ -33,8 +33,8 @@ public static class PhantomLogger { | |||||||
| 		return Base.ForContext("Category", name); | 		return Base.ForContext("Category", name); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public static ILogger Create(Type type) { | 	public static ILogger Create(string name1, string name2) { | ||||||
| 		return Create(type.Name); | 		return Create(name1 + ":" + name2); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public static ILogger Create<T>() { | 	public static ILogger Create<T>() { | ||||||
| @@ -42,11 +42,11 @@ public static class PhantomLogger { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public static ILogger Create<T>(string name) { | 	public static ILogger Create<T>(string name) { | ||||||
| 		return Create(typeof(T).Name + ":" + name); | 		return Create(typeof(T).Name, name); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public static ILogger Create<T1, T2>() { | 	public static ILogger Create<T1, T2>() { | ||||||
| 		return Create(typeof(T1).Name + ":" + typeof(T2).Name); | 		return Create(typeof(T1).Name, typeof(T2).Name); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public static void Dispose() { | 	public static void Dispose() { | ||||||
|   | |||||||
| @@ -25,14 +25,14 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> { | |||||||
| 	private readonly Func<RpcClientConnection, IMessageToServerListener> listenerFactory; | 	private readonly Func<RpcClientConnection, IMessageToServerListener> listenerFactory; | ||||||
| 	private readonly CancellationToken cancellationToken; | 	private readonly CancellationToken cancellationToken; | ||||||
|  |  | ||||||
| 	private RpcLauncher(RpcConfiguration config, ServerSocket socket, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket, config.Logger) { | 	private RpcLauncher(RpcConfiguration config, ServerSocket socket, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) : base(config, socket) { | ||||||
| 		this.config = config; | 		this.config = config; | ||||||
| 		this.listenerFactory = listenerFactory; | 		this.listenerFactory = listenerFactory; | ||||||
| 		this.cancellationToken = cancellationToken; | 		this.cancellationToken = cancellationToken; | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	protected override void Connect(ServerSocket socket) { | 	protected override void Connect(ServerSocket socket) { | ||||||
| 		var logger = config.Logger; | 		var logger = config.RuntimeLogger; | ||||||
| 		var url = config.TcpUrl; | 		var url = config.TcpUrl; | ||||||
|  |  | ||||||
| 		logger.Information("Starting ZeroMQ server on {Url}...", url); | 		logger.Information("Starting ZeroMQ server on {Url}...", url); | ||||||
| @@ -41,7 +41,7 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) { | 	protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) { | ||||||
| 		var logger = config.Logger; | 		var logger = config.RuntimeLogger; | ||||||
| 		var clients = new Dictionary<ulong, Client>(); | 		var clients = new Dictionary<ulong, Client>(); | ||||||
|  |  | ||||||
| 		void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) { | 		void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) { | ||||||
|   | |||||||
| @@ -32,7 +32,7 @@ public sealed class AgentManager { | |||||||
| 		this.cancellationToken = configuration.CancellationToken; | 		this.cancellationToken = configuration.CancellationToken; | ||||||
| 		this.authToken = authToken; | 		this.authToken = authToken; | ||||||
| 		this.databaseProvider = databaseProvider; | 		this.databaseProvider = databaseProvider; | ||||||
| 		taskManager.Run(RefreshAgentStatus); | 		taskManager.Run("Refresh agent status loop", RefreshAgentStatus); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public async Task Initialize() { | 	public async Task Initialize() { | ||||||
|   | |||||||
| @@ -36,7 +36,7 @@ public sealed partial class AuditLog { | |||||||
|  |  | ||||||
| 	private void AddEvent(string? userId, AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) { | 	private void AddEvent(string? userId, AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) { | ||||||
| 		var eventEntity = new AuditEventEntity(userId, eventType, subjectId, extra); | 		var eventEntity = new AuditEventEntity(userId, eventType, subjectId, extra); | ||||||
| 		taskManager.Run(() => AddEventToDatabase(eventEntity)); | 		taskManager.Run("Store audit log event", () => AddEventToDatabase(eventEntity)); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	private async Task AddEvent(AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) { | 	private async Task AddEvent(AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) { | ||||||
|   | |||||||
| @@ -20,7 +20,7 @@ public sealed class PhantomLoginStore { | |||||||
|  |  | ||||||
| 	private PhantomLoginStore(TaskManager taskManager, CancellationToken cancellationToken) { | 	private PhantomLoginStore(TaskManager taskManager, CancellationToken cancellationToken) { | ||||||
| 		this.cancellationToken = cancellationToken; | 		this.cancellationToken = cancellationToken; | ||||||
| 		taskManager.Run(RunExpirationLoop); | 		taskManager.Run("Web login entry expiration loop", RunExpirationLoop); | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	private async Task RunExpirationLoop() { | 	private async Task RunExpirationLoop() { | ||||||
|   | |||||||
| @@ -8,7 +8,7 @@ using Serilog; | |||||||
| namespace Phantom.Server; | namespace Phantom.Server; | ||||||
|  |  | ||||||
| static class CertificateFiles { | static class CertificateFiles { | ||||||
| 	private static ILogger Logger { get; } = PhantomLogger.Create(typeof(CertificateFiles)); | 	private static ILogger Logger { get; } = PhantomLogger.Create(nameof(CertificateFiles)); | ||||||
|  |  | ||||||
| 	private const string SecretKeyFileName = "secret.key"; | 	private const string SecretKeyFileName = "secret.key"; | ||||||
| 	private const string AgentKeyFileName = "agent.key"; | 	private const string AgentKeyFileName = "agent.key"; | ||||||
|   | |||||||
| @@ -15,7 +15,7 @@ using WebConfiguration = Phantom.Server.Web.Configuration; | |||||||
| using WebLauncher = Phantom.Server.Web.Launcher; | using WebLauncher = Phantom.Server.Web.Launcher; | ||||||
|  |  | ||||||
| var cancellationTokenSource = new CancellationTokenSource(); | var cancellationTokenSource = new CancellationTokenSource(); | ||||||
| var taskManager = new TaskManager(); | var taskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Server")); | ||||||
|  |  | ||||||
| PosixSignals.RegisterCancellation(cancellationTokenSource, static () => { | PosixSignals.RegisterCancellation(cancellationTokenSource, static () => { | ||||||
| 	PhantomLogger.Root.InformationHeading("Stopping Phantom Panel server..."); | 	PhantomLogger.Root.InformationHeading("Stopping Phantom Panel server..."); | ||||||
| @@ -53,7 +53,7 @@ try { | |||||||
| 	 | 	 | ||||||
| 	var (certificate, agentToken) = certificateData.Value; | 	var (certificate, agentToken) = certificateData.Value; | ||||||
| 	 | 	 | ||||||
| 	var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), rpcServerHost, rpcServerPort, certificate); | 	var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), rpcServerHost, rpcServerPort, certificate); | ||||||
| 	var webConfiguration = new WebConfiguration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, webKeysPath, cancellationTokenSource.Token); | 	var webConfiguration = new WebConfiguration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, webKeysPath, cancellationTokenSource.Token); | ||||||
|  |  | ||||||
| 	PhantomLogger.Root.InformationHeading("Launching Phantom Panel server..."); | 	PhantomLogger.Root.InformationHeading("Launching Phantom Panel server..."); | ||||||
| @@ -79,7 +79,6 @@ try { | |||||||
| } finally { | } finally { | ||||||
| 	cancellationTokenSource.Cancel(); | 	cancellationTokenSource.Cancel(); | ||||||
| 	 | 	 | ||||||
| 	PhantomLogger.Root.Information("Stopping task manager..."); |  | ||||||
| 	await taskManager.Stop(); | 	await taskManager.Stop(); | ||||||
| 	 | 	 | ||||||
| 	cancellationTokenSource.Dispose(); | 	cancellationTokenSource.Dispose(); | ||||||
|   | |||||||
| @@ -19,7 +19,7 @@ public abstract class MessageHandler<TListener> { | |||||||
| 	 | 	 | ||||||
| 	internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> { | 	internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> { | ||||||
| 		cancellationToken.ThrowIfCancellationRequested(); | 		cancellationToken.ThrowIfCancellationRequested(); | ||||||
| 		taskManager.Run(async () => { | 		taskManager.Run("Handle message {Type}" + message.GetType().Name, async () => { | ||||||
| 			try { | 			try { | ||||||
| 				await Handle<TMessage, TReply>(sequenceId, message); | 				await Handle<TMessage, TReply>(sequenceId, message); | ||||||
| 			} catch (Exception e) { | 			} catch (Exception e) { | ||||||
|   | |||||||
| @@ -3,6 +3,6 @@ using Serilog; | |||||||
|  |  | ||||||
| namespace Phantom.Utils.Rpc; | namespace Phantom.Utils.Rpc; | ||||||
|  |  | ||||||
| public sealed record RpcConfiguration(ILogger Logger, string Host, ushort Port, NetMQCertificate ServerCertificate) { | public sealed record RpcConfiguration(ILogger RuntimeLogger, ILogger TaskManagerLogger, string Host, ushort Port, NetMQCertificate ServerCertificate) { | ||||||
| 	public string TcpUrl => "tcp://" + Host + ":" + Port; | 	public string TcpUrl => "tcp://" + Host + ":" + Port; | ||||||
| } | } | ||||||
|   | |||||||
| @@ -26,17 +26,17 @@ static class RpcRuntime { | |||||||
|  |  | ||||||
| public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new() { | public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new() { | ||||||
| 	private readonly TSocket socket; | 	private readonly TSocket socket; | ||||||
|  | 	private readonly ILogger runtimeLogger; | ||||||
| 	private readonly MessageReplyTracker replyTracker; | 	private readonly MessageReplyTracker replyTracker; | ||||||
| 	private readonly TaskManager taskManager; | 	private readonly TaskManager taskManager; | ||||||
| 	private readonly ILogger logger; |  | ||||||
|  |  | ||||||
| 	protected RpcRuntime(TSocket socket, ILogger logger) { | 	protected RpcRuntime(RpcConfiguration configuration, TSocket socket) { | ||||||
| 		RpcRuntime.MarkRuntimeCreated(); | 		RpcRuntime.MarkRuntimeCreated(); | ||||||
| 		RpcRuntime.SetDefaultSocketOptions(socket.Options); | 		RpcRuntime.SetDefaultSocketOptions(socket.Options); | ||||||
| 		this.socket = socket; | 		this.socket = socket; | ||||||
| 		this.logger = logger; | 		this.runtimeLogger = configuration.RuntimeLogger; | ||||||
| 		this.replyTracker = new MessageReplyTracker(logger); | 		this.replyTracker = new MessageReplyTracker(runtimeLogger); | ||||||
| 		this.taskManager = new TaskManager(); | 		this.taskManager = new TaskManager(configuration.TaskManagerLogger); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	protected async Task Launch() { | 	protected async Task Launch() { | ||||||
| @@ -51,13 +51,12 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new( | |||||||
| 		} catch (OperationCanceledException) { | 		} catch (OperationCanceledException) { | ||||||
| 			// ignore | 			// ignore | ||||||
| 		} finally { | 		} finally { | ||||||
| 			logger.Information("Stopping task manager..."); |  | ||||||
| 			await taskManager.Stop(); | 			await taskManager.Stop(); | ||||||
| 			await Disconnect(); | 			await Disconnect(); | ||||||
| 			 | 			 | ||||||
| 			socket.Dispose(); | 			socket.Dispose(); | ||||||
| 			NetMQConfig.Cleanup(); | 			NetMQConfig.Cleanup(); | ||||||
| 			logger.Information("ZeroMQ client stopped."); | 			runtimeLogger.Information("ZeroMQ client stopped."); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
|   | |||||||
| @@ -5,6 +5,10 @@ | |||||||
|     <ImplicitUsings>enable</ImplicitUsings> |     <ImplicitUsings>enable</ImplicitUsings> | ||||||
|     <Nullable>enable</Nullable> |     <Nullable>enable</Nullable> | ||||||
|   </PropertyGroup> |   </PropertyGroup> | ||||||
|  |  | ||||||
|  |   <ItemGroup> | ||||||
|  |     <PackageReference Include="Serilog" /> | ||||||
|  |   </ItemGroup> | ||||||
|    |    | ||||||
|   <ItemGroup> |   <ItemGroup> | ||||||
|     <ProjectReference Include="..\Phantom.Utils.Collections\Phantom.Utils.Collections.csproj" /> |     <ProjectReference Include="..\Phantom.Utils.Collections\Phantom.Utils.Collections.csproj" /> | ||||||
|   | |||||||
| @@ -1,21 +1,24 @@ | |||||||
| using System.Collections.Concurrent; | using System.Collections.Concurrent; | ||||||
| using Phantom.Utils.Collections; | using Phantom.Utils.Collections; | ||||||
|  | using Serilog; | ||||||
|  |  | ||||||
| namespace Phantom.Utils.Runtime;  | namespace Phantom.Utils.Runtime;  | ||||||
|  |  | ||||||
| public sealed class TaskManager { | public sealed class TaskManager { | ||||||
|  | 	private readonly ILogger logger; | ||||||
| 	private readonly CancellationTokenSource cancellationTokenSource = new (); | 	private readonly CancellationTokenSource cancellationTokenSource = new (); | ||||||
| 	private readonly CancellationToken cancellationToken; | 	private readonly CancellationToken cancellationToken; | ||||||
| 	 | 	 | ||||||
| 	private readonly ConcurrentDictionary<Task, object?> runningTasks = new (ReferenceEqualityComparer<Task>.Instance); | 	private readonly ConcurrentDictionary<Task, string> runningTasks = new (ReferenceEqualityComparer<Task>.Instance); | ||||||
| 	 | 	 | ||||||
| 	public TaskManager() { | 	public TaskManager(ILogger logger) { | ||||||
| 		cancellationToken = cancellationTokenSource.Token; | 		this.logger = logger; | ||||||
|  | 		this.cancellationToken = cancellationTokenSource.Token; | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	private T Add<T>(T task) where T : Task { | 	private T Add<T>(string name, T task) where T : Task { | ||||||
| 		cancellationToken.ThrowIfCancellationRequested(); | 		cancellationToken.ThrowIfCancellationRequested(); | ||||||
| 		runningTasks.TryAdd(task, null); | 		runningTasks.TryAdd(task, name); | ||||||
| 		task.ContinueWith(OnFinished, CancellationToken.None, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Default); | 		task.ContinueWith(OnFinished, CancellationToken.None, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Default); | ||||||
| 		return task; | 		return task; | ||||||
| 	} | 	} | ||||||
| @@ -24,21 +27,44 @@ public sealed class TaskManager { | |||||||
| 		runningTasks.TryRemove(task, out _); | 		runningTasks.TryRemove(task, out _); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public Task Run(Action action) { | 	public Task Run(string name, Action action) { | ||||||
| 		return Add(Task.Run(action, cancellationToken)); | 		return Add(name, Task.Run(action, cancellationToken)); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public Task Run(Func<Task> taskFunc) { | 	public Task Run(string name, Func<Task> taskFunc) { | ||||||
| 		return Add(Task.Run(taskFunc, cancellationToken)); | 		return Add(name, Task.Run(taskFunc, cancellationToken)); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public Task<T> Run<T>(Func<Task<T>> taskFunc) { | 	public Task<T> Run<T>(string name, Func<Task<T>> taskFunc) { | ||||||
| 		return Add(Task.Run(taskFunc, cancellationToken)); | 		return Add(name, Task.Run(taskFunc, cancellationToken)); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	public async Task Stop() { | 	public async Task Stop() { | ||||||
| 		cancellationTokenSource.Cancel(); | 		logger.Information("Stopping task manager..."); | ||||||
| 		 | 		 | ||||||
|  | 		cancellationTokenSource.Cancel(); | ||||||
|  |  | ||||||
|  | 		var remainingTasksAwaiterTask = WaitForRemainingTasks(); | ||||||
|  | 		while (true) { | ||||||
|  | 			var logStateTimeoutTask = Task.Delay(TimeSpan.FromSeconds(10), CancellationToken.None); | ||||||
|  | 			var completedTask = await Task.WhenAny(remainingTasksAwaiterTask, logStateTimeoutTask); | ||||||
|  | 			if (completedTask == logStateTimeoutTask) { | ||||||
|  | 				var remainingTaskNames = runningTasks.Values.Order().ToList(); | ||||||
|  | 				var remainingTaskNameList = string.Join('\n', remainingTaskNames.Select(static name => "- " + name)); | ||||||
|  | 				logger.Warning("Waiting for {TaskCount} task(s) to finish:\n{TaskNames}", remainingTaskNames.Count, remainingTaskNameList); | ||||||
|  | 			} | ||||||
|  | 			else { | ||||||
|  | 				break; | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		runningTasks.Clear(); | ||||||
|  | 		cancellationTokenSource.Dispose(); | ||||||
|  | 		 | ||||||
|  | 		logger.Information("Task manager stopped."); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	private async Task WaitForRemainingTasks() { | ||||||
| 		foreach (var task in runningTasks.Keys) { | 		foreach (var task in runningTasks.Keys) { | ||||||
| 			try { | 			try { | ||||||
| 				await task; | 				await task; | ||||||
| @@ -46,8 +72,5 @@ public sealed class TaskManager { | |||||||
| 				// ignored | 				// ignored | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		 |  | ||||||
| 		runningTasks.Clear(); |  | ||||||
| 		cancellationTokenSource.Dispose(); |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user