1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-10-18 06:42:50 +02:00

Compare commits

...

2 Commits

Author SHA1 Message Date
d2e7f4f876
Add TaskManager shutdown logging of remaining tasks 2023-01-25 12:08:53 +01:00
c4cf45776d
Refactor PhantomLogger overloads 2023-01-25 05:04:28 +01:00
25 changed files with 84 additions and 57 deletions

View File

@ -7,7 +7,7 @@ using Serilog;
namespace Phantom.Agent.Minecraft.Java; namespace Phantom.Agent.Minecraft.Java;
public sealed class JavaRuntimeDiscovery { public sealed class JavaRuntimeDiscovery {
private static readonly ILogger Logger = PhantomLogger.Create(typeof(JavaRuntimeDiscovery)); private static readonly ILogger Logger = PhantomLogger.Create(nameof(JavaRuntimeDiscovery));
public static string? GetSystemSearchPath() { public static string? GetSystemSearchPath() {
const string LinuxJavaPath = "/usr/lib/jvm"; const string LinuxJavaPath = "/usr/lib/jvm";

View File

@ -31,7 +31,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
private readonly SemaphoreSlim disconnectSemaphore; private readonly SemaphoreSlim disconnectSemaphore;
private readonly CancellationToken receiveCancellationToken; private readonly CancellationToken receiveCancellationToken;
private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<RpcServerConnection, IMessageToAgentListener> messageListenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, config.Logger) { private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<RpcServerConnection, IMessageToAgentListener> messageListenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, socket) {
this.config = config; this.config = config;
this.agentGuid = agentGuid; this.agentGuid = agentGuid;
this.messageListenerFactory = messageListenerFactory; this.messageListenerFactory = messageListenerFactory;
@ -40,7 +40,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
} }
protected override void Connect(ClientSocket socket) { protected override void Connect(ClientSocket socket) {
var logger = config.Logger; var logger = config.RuntimeLogger;
var url = config.TcpUrl; var url = config.TcpUrl;
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url); logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
@ -52,7 +52,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
var connection = new RpcServerConnection(socket, replyTracker); var connection = new RpcServerConnection(socket, replyTracker);
ServerMessaging.SetCurrentConnection(connection); ServerMessaging.SetCurrentConnection(connection);
var logger = config.Logger; var logger = config.RuntimeLogger;
var handler = new MessageToAgentHandler(messageListenerFactory(connection), logger, taskManager, receiveCancellationToken); var handler = new MessageToAgentHandler(messageListenerFactory(connection), logger, taskManager, receiveCancellationToken);
var keepAliveLoop = new KeepAliveLoop(connection); var keepAliveLoop = new KeepAliveLoop(connection);
@ -93,7 +93,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
var unregisterTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), CancellationToken.None); var unregisterTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), CancellationToken.None);
var finishedTask = await Task.WhenAny(ServerMessaging.Send(new UnregisterAgentMessage(agentGuid)), unregisterTimeoutTask); var finishedTask = await Task.WhenAny(ServerMessaging.Send(new UnregisterAgentMessage(agentGuid)), unregisterTimeoutTask);
if (finishedTask == unregisterTimeoutTask) { if (finishedTask == unregisterTimeoutTask) {
config.Logger.Error("Timed out communicating agent shutdown with the server."); config.RuntimeLogger.Error("Timed out communicating agent shutdown with the server.");
} }
} }

View File

@ -5,7 +5,7 @@ using Serilog;
namespace Phantom.Agent.Rpc; namespace Phantom.Agent.Rpc;
public static class ServerMessaging { public static class ServerMessaging {
private static readonly ILogger Logger = PhantomLogger.Create(typeof(ServerMessaging)); private static readonly ILogger Logger = PhantomLogger.Create(nameof(ServerMessaging));
private static RpcServerConnection? CurrentConnection { get; set; } private static RpcServerConnection? CurrentConnection { get; set; }
private static RpcServerConnection CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready."); private static RpcServerConnection CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready.");

View File

@ -18,7 +18,7 @@ public sealed class AgentServices {
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders) { public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders) {
this.AgentFolders = agentFolders; this.AgentFolders = agentFolders;
this.TaskManager = new TaskManager(); this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>());
this.JavaRuntimeRepository = new JavaRuntimeRepository(); this.JavaRuntimeRepository = new JavaRuntimeRepository();
this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager); this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager);
} }
@ -30,10 +30,9 @@ public sealed class AgentServices {
} }
public async Task Shutdown() { public async Task Shutdown() {
Logger.Information("Stopping instances..."); Logger.Information("Stopping services...");
await InstanceSessionManager.StopAll();
Logger.Information("Stopping task manager..."); await InstanceSessionManager.StopAll();
await TaskManager.Stop(); await TaskManager.Stop();
Logger.Information("Services stopped."); Logger.Information("Services stopped.");

View File

@ -143,7 +143,7 @@ sealed class Instance : IDisposable {
public override void ReportStatus(IInstanceStatus newStatus) { public override void ReportStatus(IInstanceStatus newStatus) {
int myStatusUpdateCounter = Interlocked.Increment(ref statusUpdateCounter); int myStatusUpdateCounter = Interlocked.Increment(ref statusUpdateCounter);
instance.launchServices.TaskManager.Run(async () => { instance.launchServices.TaskManager.Run("Report status of instance " + instance.shortName + " as " + newStatus.GetType().Name, async () => {
if (myStatusUpdateCounter == statusUpdateCounter) { if (myStatusUpdateCounter == statusUpdateCounter) {
instance.currentStatus = newStatus; instance.currentStatus = newStatus;
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, newStatus)); await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, newStatus));

View File

@ -24,7 +24,7 @@ sealed class InstanceLogSender {
this.logger = PhantomLogger.Create<InstanceLogSender>(name); this.logger = PhantomLogger.Create<InstanceLogSender>(name);
this.cancellationTokenSource = new CancellationTokenSource(); this.cancellationTokenSource = new CancellationTokenSource();
this.cancellationToken = cancellationTokenSource.Token; this.cancellationToken = cancellationTokenSource.Token;
taskManager.Run(Run); taskManager.Run("Instance log sender for " + name, Run);
} }
private async Task Run() { private async Task Run() {

View File

@ -132,6 +132,8 @@ sealed class InstanceSessionManager : IDisposable {
public async Task StopAll() { public async Task StopAll() {
shutdownCancellationTokenSource.Cancel(); shutdownCancellationTokenSource.Cancel();
Logger.Information("Stopping all instances...");
await semaphore.WaitAsync(CancellationToken.None); await semaphore.WaitAsync(CancellationToken.None);
try { try {

View File

@ -19,7 +19,7 @@ sealed class InstanceLaunchingState : IInstanceState, IDisposable {
public void Initialize() { public void Initialize() {
context.Logger.Information("Session starting..."); context.Logger.Information("Session starting...");
var launchTask = context.LaunchServices.TaskManager.Run(DoLaunch); var launchTask = context.LaunchServices.TaskManager.Run("Launch procedure for instance " + context.ShortName, DoLaunch);
launchTask.ContinueWith(OnLaunchSuccess, CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default); launchTask.ContinueWith(OnLaunchSuccess, CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default);
launchTask.ContinueWith(OnLaunchFailure, CancellationToken.None, TaskContinuationOptions.NotOnRanToCompletion, TaskScheduler.Default); launchTask.ContinueWith(OnLaunchFailure, CancellationToken.None, TaskContinuationOptions.NotOnRanToCompletion, TaskScheduler.Default);
} }

View File

@ -30,7 +30,7 @@ sealed class InstanceRunningState : IInstanceState {
if (session.HasEnded) { if (session.HasEnded) {
if (sessionObjects.Dispose()) { if (sessionObjects.Dispose()) {
context.Logger.Warning("Session ended immediately after it was started."); context.Logger.Warning("Session ended immediately after it was started.");
context.LaunchServices.TaskManager.Run(() => context.TransitionState(new InstanceNotRunningState(), InstanceStatus.Failed(InstanceLaunchFailReason.UnknownError))); context.LaunchServices.TaskManager.Run("Transition state of instance " + context.ShortName + " to not running", () => context.TransitionState(new InstanceNotRunningState(), InstanceStatus.Failed(InstanceLaunchFailReason.UnknownError)));
} }
} }
else { else {
@ -75,7 +75,7 @@ sealed class InstanceRunningState : IInstanceState {
} }
isStopping = true; isStopping = true;
context.LaunchServices.TaskManager.Run(() => StopLater(stopStrategy.Seconds)); context.LaunchServices.TaskManager.Run("Delayed stop timer for instance " + context.ShortName, () => StopLater(stopStrategy.Seconds));
return (this, StopInstanceResult.StopInitiated); return (this, StopInstanceResult.StopInitiated);
} }

View File

@ -21,7 +21,7 @@ sealed class InstanceStoppingState : IInstanceState, IDisposable {
public void Initialize() { public void Initialize() {
context.Logger.Information("Session stopping."); context.Logger.Information("Session stopping.");
context.ReportStatus(InstanceStatus.Stopping); context.ReportStatus(InstanceStatus.Stopping);
context.LaunchServices.TaskManager.Run(DoStop); context.LaunchServices.TaskManager.Run("Stop procedure for instance " + context.ShortName, DoStop);
} }
private async Task DoStop() { private async Task DoStop() {

View File

@ -8,7 +8,7 @@ using Serilog;
namespace Phantom.Agent; namespace Phantom.Agent;
static class AgentKey { static class AgentKey {
private static ILogger Logger { get; } = PhantomLogger.Create(typeof(AgentKey)); private static ILogger Logger { get; } = PhantomLogger.Create(nameof(AgentKey));
public static Task<(NetMQCertificate, AgentAuthToken)?> Load(string? agentKeyToken, string? agentKeyFilePath) { public static Task<(NetMQCertificate, AgentAuthToken)?> Load(string? agentKeyToken, string? agentKeyFilePath) {
if (agentKeyFilePath != null) { if (agentKeyFilePath != null) {

View File

@ -6,7 +6,7 @@ using Serilog;
namespace Phantom.Agent; namespace Phantom.Agent;
static class GuidFile { static class GuidFile {
private static ILogger Logger { get; } = PhantomLogger.Create(typeof(GuidFile)); private static ILogger Logger { get; } = PhantomLogger.Create(nameof(GuidFile));
private const string GuidFileName = "agent.guid"; private const string GuidFileName = "agent.guid";

View File

@ -53,7 +53,8 @@ try {
await agentServices.Initialize(); await agentServices.Initialize();
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1); var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
var rpcTask = RpcLauncher.Launch(new RpcConfiguration(PhantomLogger.Create("Rpc"), serverHost, serverPort, serverCertificate), agentToken, agentInfo, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken); var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), serverHost, serverPort, serverCertificate);
var rpcTask = RpcLauncher.Launch(rpcConfiguration, agentToken, agentInfo, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken);
try { try {
await rpcTask.WaitAsync(shutdownCancellationToken); await rpcTask.WaitAsync(shutdownCancellationToken);
} finally { } finally {

View File

@ -33,8 +33,8 @@ public static class PhantomLogger {
return Base.ForContext("Category", name); return Base.ForContext("Category", name);
} }
public static ILogger Create(Type type) { public static ILogger Create(string name1, string name2) {
return Create(type.Name); return Create(name1 + ":" + name2);
} }
public static ILogger Create<T>() { public static ILogger Create<T>() {
@ -42,11 +42,11 @@ public static class PhantomLogger {
} }
public static ILogger Create<T>(string name) { public static ILogger Create<T>(string name) {
return Create(typeof(T).Name + ":" + name); return Create(typeof(T).Name, name);
} }
public static ILogger Create<T1, T2>() { public static ILogger Create<T1, T2>() {
return Create(typeof(T1).Name + ":" + typeof(T2).Name); return Create(typeof(T1).Name, typeof(T2).Name);
} }
public static void Dispose() { public static void Dispose() {

View File

@ -25,14 +25,14 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
private readonly Func<RpcClientConnection, IMessageToServerListener> listenerFactory; private readonly Func<RpcClientConnection, IMessageToServerListener> listenerFactory;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
private RpcLauncher(RpcConfiguration config, ServerSocket socket, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket, config.Logger) { private RpcLauncher(RpcConfiguration config, ServerSocket socket, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) : base(config, socket) {
this.config = config; this.config = config;
this.listenerFactory = listenerFactory; this.listenerFactory = listenerFactory;
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
} }
protected override void Connect(ServerSocket socket) { protected override void Connect(ServerSocket socket) {
var logger = config.Logger; var logger = config.RuntimeLogger;
var url = config.TcpUrl; var url = config.TcpUrl;
logger.Information("Starting ZeroMQ server on {Url}...", url); logger.Information("Starting ZeroMQ server on {Url}...", url);
@ -41,7 +41,7 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
} }
protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) { protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
var logger = config.Logger; var logger = config.RuntimeLogger;
var clients = new Dictionary<ulong, Client>(); var clients = new Dictionary<ulong, Client>();
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) { void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {

View File

@ -32,7 +32,7 @@ public sealed class AgentManager {
this.cancellationToken = configuration.CancellationToken; this.cancellationToken = configuration.CancellationToken;
this.authToken = authToken; this.authToken = authToken;
this.databaseProvider = databaseProvider; this.databaseProvider = databaseProvider;
taskManager.Run(RefreshAgentStatus); taskManager.Run("Refresh agent status loop", RefreshAgentStatus);
} }
public async Task Initialize() { public async Task Initialize() {

View File

@ -36,7 +36,7 @@ public sealed partial class AuditLog {
private void AddEvent(string? userId, AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) { private void AddEvent(string? userId, AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
var eventEntity = new AuditEventEntity(userId, eventType, subjectId, extra); var eventEntity = new AuditEventEntity(userId, eventType, subjectId, extra);
taskManager.Run(() => AddEventToDatabase(eventEntity)); taskManager.Run("Store audit log event", () => AddEventToDatabase(eventEntity));
} }
private async Task AddEvent(AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) { private async Task AddEvent(AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {

View File

@ -20,7 +20,7 @@ public sealed class PhantomLoginStore {
private PhantomLoginStore(TaskManager taskManager, CancellationToken cancellationToken) { private PhantomLoginStore(TaskManager taskManager, CancellationToken cancellationToken) {
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
taskManager.Run(RunExpirationLoop); taskManager.Run("Web login entry expiration loop", RunExpirationLoop);
} }
private async Task RunExpirationLoop() { private async Task RunExpirationLoop() {

View File

@ -8,7 +8,7 @@ using Serilog;
namespace Phantom.Server; namespace Phantom.Server;
static class CertificateFiles { static class CertificateFiles {
private static ILogger Logger { get; } = PhantomLogger.Create(typeof(CertificateFiles)); private static ILogger Logger { get; } = PhantomLogger.Create(nameof(CertificateFiles));
private const string SecretKeyFileName = "secret.key"; private const string SecretKeyFileName = "secret.key";
private const string AgentKeyFileName = "agent.key"; private const string AgentKeyFileName = "agent.key";

View File

@ -15,7 +15,7 @@ using WebConfiguration = Phantom.Server.Web.Configuration;
using WebLauncher = Phantom.Server.Web.Launcher; using WebLauncher = Phantom.Server.Web.Launcher;
var cancellationTokenSource = new CancellationTokenSource(); var cancellationTokenSource = new CancellationTokenSource();
var taskManager = new TaskManager(); var taskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Server"));
PosixSignals.RegisterCancellation(cancellationTokenSource, static () => { PosixSignals.RegisterCancellation(cancellationTokenSource, static () => {
PhantomLogger.Root.InformationHeading("Stopping Phantom Panel server..."); PhantomLogger.Root.InformationHeading("Stopping Phantom Panel server...");
@ -53,7 +53,7 @@ try {
var (certificate, agentToken) = certificateData.Value; var (certificate, agentToken) = certificateData.Value;
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), rpcServerHost, rpcServerPort, certificate); var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), rpcServerHost, rpcServerPort, certificate);
var webConfiguration = new WebConfiguration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, webKeysPath, cancellationTokenSource.Token); var webConfiguration = new WebConfiguration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, webKeysPath, cancellationTokenSource.Token);
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server..."); PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
@ -79,7 +79,6 @@ try {
} finally { } finally {
cancellationTokenSource.Cancel(); cancellationTokenSource.Cancel();
PhantomLogger.Root.Information("Stopping task manager...");
await taskManager.Stop(); await taskManager.Stop();
cancellationTokenSource.Dispose(); cancellationTokenSource.Dispose();

View File

@ -19,7 +19,7 @@ public abstract class MessageHandler<TListener> {
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> { internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
taskManager.Run(async () => { taskManager.Run("Handle message {Type}" + message.GetType().Name, async () => {
try { try {
await Handle<TMessage, TReply>(sequenceId, message); await Handle<TMessage, TReply>(sequenceId, message);
} catch (Exception e) { } catch (Exception e) {

View File

@ -3,6 +3,6 @@ using Serilog;
namespace Phantom.Utils.Rpc; namespace Phantom.Utils.Rpc;
public sealed record RpcConfiguration(ILogger Logger, string Host, ushort Port, NetMQCertificate ServerCertificate) { public sealed record RpcConfiguration(ILogger RuntimeLogger, ILogger TaskManagerLogger, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public string TcpUrl => "tcp://" + Host + ":" + Port; public string TcpUrl => "tcp://" + Host + ":" + Port;
} }

View File

@ -26,17 +26,17 @@ static class RpcRuntime {
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new() { public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new() {
private readonly TSocket socket; private readonly TSocket socket;
private readonly ILogger runtimeLogger;
private readonly MessageReplyTracker replyTracker; private readonly MessageReplyTracker replyTracker;
private readonly TaskManager taskManager; private readonly TaskManager taskManager;
private readonly ILogger logger;
protected RpcRuntime(TSocket socket, ILogger logger) { protected RpcRuntime(RpcConfiguration configuration, TSocket socket) {
RpcRuntime.MarkRuntimeCreated(); RpcRuntime.MarkRuntimeCreated();
RpcRuntime.SetDefaultSocketOptions(socket.Options); RpcRuntime.SetDefaultSocketOptions(socket.Options);
this.socket = socket; this.socket = socket;
this.logger = logger; this.runtimeLogger = configuration.RuntimeLogger;
this.replyTracker = new MessageReplyTracker(logger); this.replyTracker = new MessageReplyTracker(runtimeLogger);
this.taskManager = new TaskManager(); this.taskManager = new TaskManager(configuration.TaskManagerLogger);
} }
protected async Task Launch() { protected async Task Launch() {
@ -51,13 +51,12 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new(
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
// ignore // ignore
} finally { } finally {
logger.Information("Stopping task manager...");
await taskManager.Stop(); await taskManager.Stop();
await Disconnect(); await Disconnect();
socket.Dispose(); socket.Dispose();
NetMQConfig.Cleanup(); NetMQConfig.Cleanup();
logger.Information("ZeroMQ client stopped."); runtimeLogger.Information("ZeroMQ client stopped.");
} }
} }

View File

@ -5,6 +5,10 @@
<ImplicitUsings>enable</ImplicitUsings> <ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable> <Nullable>enable</Nullable>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<PackageReference Include="Serilog" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Phantom.Utils.Collections\Phantom.Utils.Collections.csproj" /> <ProjectReference Include="..\Phantom.Utils.Collections\Phantom.Utils.Collections.csproj" />

View File

@ -1,21 +1,24 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using Phantom.Utils.Collections; using Phantom.Utils.Collections;
using Serilog;
namespace Phantom.Utils.Runtime; namespace Phantom.Utils.Runtime;
public sealed class TaskManager { public sealed class TaskManager {
private readonly ILogger logger;
private readonly CancellationTokenSource cancellationTokenSource = new (); private readonly CancellationTokenSource cancellationTokenSource = new ();
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
private readonly ConcurrentDictionary<Task, object?> runningTasks = new (ReferenceEqualityComparer<Task>.Instance); private readonly ConcurrentDictionary<Task, string> runningTasks = new (ReferenceEqualityComparer<Task>.Instance);
public TaskManager() { public TaskManager(ILogger logger) {
cancellationToken = cancellationTokenSource.Token; this.logger = logger;
this.cancellationToken = cancellationTokenSource.Token;
} }
private T Add<T>(T task) where T : Task { private T Add<T>(string name, T task) where T : Task {
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
runningTasks.TryAdd(task, null); runningTasks.TryAdd(task, name);
task.ContinueWith(OnFinished, CancellationToken.None, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Default); task.ContinueWith(OnFinished, CancellationToken.None, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Default);
return task; return task;
} }
@ -24,21 +27,44 @@ public sealed class TaskManager {
runningTasks.TryRemove(task, out _); runningTasks.TryRemove(task, out _);
} }
public Task Run(Action action) { public Task Run(string name, Action action) {
return Add(Task.Run(action, cancellationToken)); return Add(name, Task.Run(action, cancellationToken));
} }
public Task Run(Func<Task> taskFunc) { public Task Run(string name, Func<Task> taskFunc) {
return Add(Task.Run(taskFunc, cancellationToken)); return Add(name, Task.Run(taskFunc, cancellationToken));
} }
public Task<T> Run<T>(Func<Task<T>> taskFunc) { public Task<T> Run<T>(string name, Func<Task<T>> taskFunc) {
return Add(Task.Run(taskFunc, cancellationToken)); return Add(name, Task.Run(taskFunc, cancellationToken));
} }
public async Task Stop() { public async Task Stop() {
cancellationTokenSource.Cancel(); logger.Information("Stopping task manager...");
cancellationTokenSource.Cancel();
var remainingTasksAwaiterTask = WaitForRemainingTasks();
while (true) {
var logStateTimeoutTask = Task.Delay(TimeSpan.FromSeconds(10), CancellationToken.None);
var completedTask = await Task.WhenAny(remainingTasksAwaiterTask, logStateTimeoutTask);
if (completedTask == logStateTimeoutTask) {
var remainingTaskNames = runningTasks.Values.Order().ToList();
var remainingTaskNameList = string.Join('\n', remainingTaskNames.Select(static name => "- " + name));
logger.Warning("Waiting for {TaskCount} task(s) to finish:\n{TaskNames}", remainingTaskNames.Count, remainingTaskNameList);
}
else {
break;
}
}
runningTasks.Clear();
cancellationTokenSource.Dispose();
logger.Information("Task manager stopped.");
}
private async Task WaitForRemainingTasks() {
foreach (var task in runningTasks.Keys) { foreach (var task in runningTasks.Keys) {
try { try {
await task; await task;
@ -46,8 +72,5 @@ public sealed class TaskManager {
// ignored // ignored
} }
} }
runningTasks.Clear();
cancellationTokenSource.Dispose();
} }
} }