1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-04-10 20:15:44 +02:00

Fix graceful shutdown issues

This commit is contained in:
chylex 2022-10-14 15:41:46 +02:00
parent 2b661fd170
commit c4b1d3c920
Signed by: chylex
GPG Key ID: 4DE42C8F19A80548
12 changed files with 73 additions and 41 deletions
Agent
Phantom.Agent.Rpc
Phantom.Agent.Services
Phantom.Agent
Server
Phantom.Server.Rpc
Phantom.Server.Services
Phantom.Server.Web/Authentication
Phantom.Server
Utils
Phantom.Utils.Rpc
Phantom.Utils.Runtime

View File

@ -4,6 +4,7 @@ using Phantom.Common.Data.Agent;
using Phantom.Common.Messages;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Rpc;
using Phantom.Utils.Threading;
using Serilog.Events;
namespace Phantom.Agent.Rpc;
@ -24,7 +25,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
private readonly Guid agentGuid;
private readonly Func<ClientSocket, IMessageToAgentListener> messageListenerFactory;
private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<ClientSocket, IMessageToAgentListener> messageListenerFactory) : base(socket, config.CancellationToken) {
private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<ClientSocket, IMessageToAgentListener> messageListenerFactory) : base(socket, config.Logger) {
this.config = config;
this.agentGuid = agentGuid;
this.messageListenerFactory = messageListenerFactory;
@ -39,9 +40,8 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
logger.Information("ZeroMQ client ready.");
}
protected override async Task Run(ClientSocket socket) {
protected override async Task Run(ClientSocket socket, TaskManager taskManager) {
var logger = config.Logger;
var taskManager = config.TaskManager;
var cancellationToken = config.CancellationToken;
var listener = messageListenerFactory(socket);

View File

@ -27,6 +27,5 @@ public sealed class AgentServices {
public async Task Shutdown() {
await InstanceSessionManager.StopAll();
await TaskManager.Stop();
}
}

View File

@ -27,7 +27,7 @@ sealed class InstanceLogSender {
taskManager.Run(Run);
}
private async void Run() {
private async Task Run() {
logger.Verbose("Task started.");
try {

View File

@ -11,6 +11,7 @@ using Phantom.Utils.Threading;
const int AgentVersion = 1;
var cancellationTokenSource = new CancellationTokenSource();
var taskManager = new TaskManager();
PosixSignals.RegisterCancellation(cancellationTokenSource, static () => {
PhantomLogger.Root.InformationHeading("Stopping Phantom Panel agent...");
@ -47,18 +48,24 @@ try {
return;
}
var taskManager = new TaskManager();
var agentInfo = new AgentInfo(agentGuid.Value, agentName, AgentVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
var agentServices = new AgentServices(agentInfo, folders, taskManager);
PhantomLogger.Root.InformationHeading("Launching Phantom Panel agent...");
await agentServices.Initialize();
await RpcLauncher.Launch(new RpcConfiguration(PhantomLogger.Create("Rpc"), serverHost, serverPort, serverCertificate, taskManager, cancellationTokenSource.Token), agentAuthToken, agentInfo, socket => new MessageListener(socket, agentServices, cancellationTokenSource));
await agentServices.Shutdown();
try {
await RpcLauncher.Launch(new RpcConfiguration(PhantomLogger.Create("Rpc"), serverHost, serverPort, serverCertificate, cancellationTokenSource.Token), agentAuthToken, agentInfo, socket => new MessageListener(socket, agentServices, cancellationTokenSource));
} finally {
cancellationTokenSource.Cancel();
await agentServices.Shutdown();
}
} catch (OperationCanceledException) {
// Ignore.
} finally {
PhantomLogger.Root.Information("Stopping task manager...");
await taskManager.Stop();
cancellationTokenSource.Dispose();
PhantomLogger.Root.Information("Bye!");
PhantomLogger.Dispose();

View File

@ -2,6 +2,7 @@
using NetMQ.Sockets;
using Phantom.Common.Messages;
using Phantom.Utils.Rpc;
using Phantom.Utils.Threading;
using Serilog.Events;
namespace Phantom.Server.Rpc;
@ -20,7 +21,7 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
private readonly RpcConfiguration config;
private readonly Func<RpcClientConnection, IMessageToServerListener> listenerFactory;
private RpcLauncher(RpcConfiguration config, ServerSocket socket, Func<RpcClientConnection, IMessageToServerListener> listenerFactory) : base(socket, config.CancellationToken) {
private RpcLauncher(RpcConfiguration config, ServerSocket socket, Func<RpcClientConnection, IMessageToServerListener> listenerFactory) : base(socket, config.Logger) {
this.config = config;
this.listenerFactory = listenerFactory;
}
@ -34,9 +35,8 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
logger.Information("ZeroMQ server initialized, listening for agent connections on port {Port}.", config.Port);
}
protected override async Task Run(ServerSocket socket) {
protected override async Task Run(ServerSocket socket, TaskManager taskManager) {
var logger = config.Logger;
var taskManager = config.TaskManager;
var cancellationToken = config.CancellationToken;
var clients = new Dictionary<ulong, Client>();

View File

@ -1,9 +1,6 @@
using Phantom.Utils.Threading;
namespace Phantom.Server.Services;
namespace Phantom.Server.Services;
public sealed record ServiceConfiguration(
byte[] AdministratorToken,
TaskManager TaskManager,
CancellationToken CancellationToken
);

View File

@ -3,6 +3,7 @@ using System.Diagnostics;
using Microsoft.AspNetCore.Identity;
using Phantom.Common.Logging;
using Phantom.Server.Services;
using Phantom.Utils.Threading;
using ILogger = Serilog.ILogger;
namespace Phantom.Server.Web.Authentication;
@ -14,9 +15,9 @@ sealed class PhantomLoginStore {
private readonly ConcurrentDictionary<string, LoginEntry> loginEntries = new ();
private readonly CancellationToken cancellationToken;
public PhantomLoginStore(ServiceConfiguration serviceConfiguration) {
this.cancellationToken = serviceConfiguration.CancellationToken;
serviceConfiguration.TaskManager.Run(RunExpirationLoop);
public PhantomLoginStore(ServiceConfiguration configuration, TaskManager taskManager) {
this.cancellationToken = configuration.CancellationToken;
taskManager.Run(RunExpirationLoop);
}
private async Task RunExpirationLoop() {

View File

@ -15,6 +15,7 @@ using WebConfiguration = Phantom.Server.Web.Configuration;
using WebLauncher = Phantom.Server.Web.Launcher;
var cancellationTokenSource = new CancellationTokenSource();
var taskManager = new TaskManager();
PosixSignals.RegisterCancellation(cancellationTokenSource, static () => {
PhantomLogger.Root.InformationHeading("Stopping Phantom Panel server...");
@ -45,8 +46,7 @@ try {
Environment.Exit(1);
}
var taskManager = new TaskManager();
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), rpcServerHost, rpcServerPort, certificate, taskManager, cancellationTokenSource.Token);
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), rpcServerHost, rpcServerPort, certificate, cancellationTokenSource.Token);
var webConfiguration = new WebConfiguration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, cancellationTokenSource.Token);
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
@ -55,8 +55,8 @@ try {
PhantomLogger.Root.Information("Your administrator token is: {AdministratorToken}", administratorToken);
PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", webConfiguration.HttpUrl, webConfiguration.BasePath + "setup");
var serviceConfiguration = new ServiceConfiguration(TokenGenerator.GetBytesOrThrow(administratorToken), taskManager, cancellationTokenSource.Token);
var webConfigurator = new WebConfigurator(agentToken, serviceConfiguration);
var serviceConfiguration = new ServiceConfiguration(TokenGenerator.GetBytesOrThrow(administratorToken), cancellationTokenSource.Token);
var webConfigurator = new WebConfigurator(serviceConfiguration, taskManager, agentToken);
var webApplication = await WebLauncher.CreateApplication(webConfiguration, webConfigurator, options => options.UseNpgsql(sqlConnectionString, static options => {
options.CommandTimeout(10).MigrationsAssembly(typeof(ApplicationDbContextDesignFactory).Assembly.FullName);
}));
@ -65,11 +65,14 @@ try {
RpcLauncher.Launch(rpcConfiguration, webApplication.Services.GetRequiredService<MessageToServerListenerFactory>().CreateListener),
WebLauncher.Launch(webConfiguration, webApplication)
);
await taskManager.Stop();
} catch (OperationCanceledException) {
// Ignore.
} finally {
cancellationTokenSource.Cancel();
PhantomLogger.Root.Information("Stopping task manager...");
await taskManager.Stop();
cancellationTokenSource.Dispose();
PhantomLogger.Root.Information("Bye!");
PhantomLogger.Dispose();

View File

@ -5,21 +5,25 @@ using Phantom.Server.Services;
using Phantom.Server.Services.Agents;
using Phantom.Server.Services.Instances;
using Phantom.Server.Services.Rpc;
using Phantom.Utils.Threading;
using WebLauncher = Phantom.Server.Web.Launcher;
namespace Phantom.Server;
sealed class WebConfigurator : WebLauncher.IConfigurator {
private readonly AgentAuthToken agentToken;
private readonly ServiceConfiguration serviceConfiguration;
private readonly TaskManager taskManager;
private readonly AgentAuthToken agentToken;
public WebConfigurator(AgentAuthToken agentToken, ServiceConfiguration serviceConfiguration) {
this.agentToken = agentToken;
public WebConfigurator(ServiceConfiguration serviceConfiguration, TaskManager taskManager, AgentAuthToken agentToken) {
this.serviceConfiguration = serviceConfiguration;
this.taskManager = taskManager;
this.agentToken = agentToken;
}
public void ConfigureServices(IServiceCollection services) {
services.AddSingleton(serviceConfiguration);
services.AddSingleton(taskManager);
services.AddSingleton(agentToken);
services.AddSingleton<AgentManager>();
services.AddSingleton<AgentJavaRuntimesManager>();

View File

@ -1,9 +1,8 @@
using NetMQ;
using Phantom.Utils.Threading;
using Serilog;
namespace Phantom.Utils.Rpc;
public sealed record RpcConfiguration(ILogger Logger, string Host, ushort Port, NetMQCertificate ServerCertificate, TaskManager TaskManager, CancellationToken CancellationToken) {
public sealed record RpcConfiguration(ILogger Logger, string Host, ushort Port, NetMQCertificate ServerCertificate, CancellationToken CancellationToken) {
public string TcpUrl => "tcp://" + Host + ":" + Port;
}

View File

@ -1,4 +1,6 @@
using NetMQ;
using Phantom.Utils.Threading;
using Serilog;
namespace Phantom.Utils.Rpc;
@ -23,30 +25,37 @@ static class RpcRuntime {
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new() {
private readonly TSocket socket;
private readonly TaskManager taskManager;
private readonly ILogger logger;
protected RpcRuntime(TSocket socket, CancellationToken cancellationToken) {
protected RpcRuntime(TSocket socket, ILogger logger) {
RpcRuntime.MarkRuntimeCreated();
RpcRuntime.SetDefaultSocketOptions(socket.Options);
this.socket = socket;
this.logger = logger;
this.taskManager = new TaskManager();
}
protected async Task Launch() {
Connect(socket);
try {
await Run(socket);
await Run(socket, taskManager);
} catch (OperationCanceledException) {
// ignore
} finally {
// TODO wait for all tasks started by MessageRegistry.Handle to complete
logger.Information("Stopping task manager...");
await taskManager.Stop();
await Disconnect(socket);
socket.Dispose();
NetMQConfig.Cleanup();
logger.Information("ZeroMQ client stopped.");
}
}
protected abstract void Connect(TSocket socket);
protected abstract Task Run(TSocket socket);
protected abstract Task Run(TSocket socket, TaskManager taskManager);
protected virtual Task Disconnect(TSocket socket) {
return Task.CompletedTask;

View File

@ -4,10 +4,12 @@ namespace Phantom.Utils.Runtime;
public static class PosixSignals {
public static void RegisterCancellation(CancellationTokenSource cancellationTokenSource, Action? callback = null) {
var shutdown = new CancellationCallback(cancellationTokenSource, callback).Run;
PosixSignalRegistration.Create(PosixSignal.SIGINT, shutdown);
PosixSignalRegistration.Create(PosixSignal.SIGTERM, shutdown);
PosixSignalRegistration.Create(PosixSignal.SIGQUIT, shutdown);
var cancellationCallback = new CancellationCallback(cancellationTokenSource, callback);
var handlePosixSignal = cancellationCallback.HandlePosixSignal;
PosixSignalRegistration.Create(PosixSignal.SIGINT, handlePosixSignal);
PosixSignalRegistration.Create(PosixSignal.SIGTERM, handlePosixSignal);
PosixSignalRegistration.Create(PosixSignal.SIGQUIT, handlePosixSignal);
Console.CancelKeyPress += cancellationCallback.HandleConsoleCancel;
}
private sealed class CancellationCallback {
@ -19,11 +21,22 @@ public static class PosixSignals {
this.callback = callback;
}
public void Run(PosixSignalContext context) {
public void HandlePosixSignal(PosixSignalContext context) {
context.Cancel = true;
if (!cancellationTokenSource.IsCancellationRequested) {
cancellationTokenSource.Cancel();
callback?.Invoke();
Run();
}
public void HandleConsoleCancel(object? sender, ConsoleCancelEventArgs e) {
e.Cancel = true;
Run();
}
private void Run() {
lock (this) {
if (!cancellationTokenSource.IsCancellationRequested) {
cancellationTokenSource.Cancel();
callback?.Invoke();
}
}
}
}