mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2024-11-25 16:42:54 +01:00
Compare commits
2 Commits
3c10e1a8f9
...
d2e7f4f876
Author | SHA1 | Date | |
---|---|---|---|
d2e7f4f876 | |||
c4cf45776d |
@ -7,7 +7,7 @@ using Serilog;
|
|||||||
namespace Phantom.Agent.Minecraft.Java;
|
namespace Phantom.Agent.Minecraft.Java;
|
||||||
|
|
||||||
public sealed class JavaRuntimeDiscovery {
|
public sealed class JavaRuntimeDiscovery {
|
||||||
private static readonly ILogger Logger = PhantomLogger.Create(typeof(JavaRuntimeDiscovery));
|
private static readonly ILogger Logger = PhantomLogger.Create(nameof(JavaRuntimeDiscovery));
|
||||||
|
|
||||||
public static string? GetSystemSearchPath() {
|
public static string? GetSystemSearchPath() {
|
||||||
const string LinuxJavaPath = "/usr/lib/jvm";
|
const string LinuxJavaPath = "/usr/lib/jvm";
|
||||||
|
@ -31,7 +31,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
|
|||||||
private readonly SemaphoreSlim disconnectSemaphore;
|
private readonly SemaphoreSlim disconnectSemaphore;
|
||||||
private readonly CancellationToken receiveCancellationToken;
|
private readonly CancellationToken receiveCancellationToken;
|
||||||
|
|
||||||
private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<RpcServerConnection, IMessageToAgentListener> messageListenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(socket, config.Logger) {
|
private RpcLauncher(RpcConfiguration config, ClientSocket socket, Guid agentGuid, Func<RpcServerConnection, IMessageToAgentListener> messageListenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) : base(config, socket) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.agentGuid = agentGuid;
|
this.agentGuid = agentGuid;
|
||||||
this.messageListenerFactory = messageListenerFactory;
|
this.messageListenerFactory = messageListenerFactory;
|
||||||
@ -40,7 +40,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected override void Connect(ClientSocket socket) {
|
protected override void Connect(ClientSocket socket) {
|
||||||
var logger = config.Logger;
|
var logger = config.RuntimeLogger;
|
||||||
var url = config.TcpUrl;
|
var url = config.TcpUrl;
|
||||||
|
|
||||||
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
|
logger.Information("Starting ZeroMQ client and connecting to {Url}...", url);
|
||||||
@ -52,7 +52,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
|
|||||||
var connection = new RpcServerConnection(socket, replyTracker);
|
var connection = new RpcServerConnection(socket, replyTracker);
|
||||||
ServerMessaging.SetCurrentConnection(connection);
|
ServerMessaging.SetCurrentConnection(connection);
|
||||||
|
|
||||||
var logger = config.Logger;
|
var logger = config.RuntimeLogger;
|
||||||
var handler = new MessageToAgentHandler(messageListenerFactory(connection), logger, taskManager, receiveCancellationToken);
|
var handler = new MessageToAgentHandler(messageListenerFactory(connection), logger, taskManager, receiveCancellationToken);
|
||||||
var keepAliveLoop = new KeepAliveLoop(connection);
|
var keepAliveLoop = new KeepAliveLoop(connection);
|
||||||
|
|
||||||
@ -93,7 +93,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
|
|||||||
var unregisterTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), CancellationToken.None);
|
var unregisterTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), CancellationToken.None);
|
||||||
var finishedTask = await Task.WhenAny(ServerMessaging.Send(new UnregisterAgentMessage(agentGuid)), unregisterTimeoutTask);
|
var finishedTask = await Task.WhenAny(ServerMessaging.Send(new UnregisterAgentMessage(agentGuid)), unregisterTimeoutTask);
|
||||||
if (finishedTask == unregisterTimeoutTask) {
|
if (finishedTask == unregisterTimeoutTask) {
|
||||||
config.Logger.Error("Timed out communicating agent shutdown with the server.");
|
config.RuntimeLogger.Error("Timed out communicating agent shutdown with the server.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,7 +5,7 @@ using Serilog;
|
|||||||
namespace Phantom.Agent.Rpc;
|
namespace Phantom.Agent.Rpc;
|
||||||
|
|
||||||
public static class ServerMessaging {
|
public static class ServerMessaging {
|
||||||
private static readonly ILogger Logger = PhantomLogger.Create(typeof(ServerMessaging));
|
private static readonly ILogger Logger = PhantomLogger.Create(nameof(ServerMessaging));
|
||||||
|
|
||||||
private static RpcServerConnection? CurrentConnection { get; set; }
|
private static RpcServerConnection? CurrentConnection { get; set; }
|
||||||
private static RpcServerConnection CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready.");
|
private static RpcServerConnection CurrentConnectionOrThrow => CurrentConnection ?? throw new InvalidOperationException("Server connection not ready.");
|
||||||
|
@ -18,7 +18,7 @@ public sealed class AgentServices {
|
|||||||
|
|
||||||
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders) {
|
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders) {
|
||||||
this.AgentFolders = agentFolders;
|
this.AgentFolders = agentFolders;
|
||||||
this.TaskManager = new TaskManager();
|
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>());
|
||||||
this.JavaRuntimeRepository = new JavaRuntimeRepository();
|
this.JavaRuntimeRepository = new JavaRuntimeRepository();
|
||||||
this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager);
|
this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager);
|
||||||
}
|
}
|
||||||
@ -30,10 +30,9 @@ public sealed class AgentServices {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async Task Shutdown() {
|
public async Task Shutdown() {
|
||||||
Logger.Information("Stopping instances...");
|
Logger.Information("Stopping services...");
|
||||||
await InstanceSessionManager.StopAll();
|
|
||||||
|
|
||||||
Logger.Information("Stopping task manager...");
|
await InstanceSessionManager.StopAll();
|
||||||
await TaskManager.Stop();
|
await TaskManager.Stop();
|
||||||
|
|
||||||
Logger.Information("Services stopped.");
|
Logger.Information("Services stopped.");
|
||||||
|
@ -143,7 +143,7 @@ sealed class Instance : IDisposable {
|
|||||||
public override void ReportStatus(IInstanceStatus newStatus) {
|
public override void ReportStatus(IInstanceStatus newStatus) {
|
||||||
int myStatusUpdateCounter = Interlocked.Increment(ref statusUpdateCounter);
|
int myStatusUpdateCounter = Interlocked.Increment(ref statusUpdateCounter);
|
||||||
|
|
||||||
instance.launchServices.TaskManager.Run(async () => {
|
instance.launchServices.TaskManager.Run("Report status of instance " + instance.shortName + " as " + newStatus.GetType().Name, async () => {
|
||||||
if (myStatusUpdateCounter == statusUpdateCounter) {
|
if (myStatusUpdateCounter == statusUpdateCounter) {
|
||||||
instance.currentStatus = newStatus;
|
instance.currentStatus = newStatus;
|
||||||
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, newStatus));
|
await ServerMessaging.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, newStatus));
|
||||||
|
@ -24,7 +24,7 @@ sealed class InstanceLogSender {
|
|||||||
this.logger = PhantomLogger.Create<InstanceLogSender>(name);
|
this.logger = PhantomLogger.Create<InstanceLogSender>(name);
|
||||||
this.cancellationTokenSource = new CancellationTokenSource();
|
this.cancellationTokenSource = new CancellationTokenSource();
|
||||||
this.cancellationToken = cancellationTokenSource.Token;
|
this.cancellationToken = cancellationTokenSource.Token;
|
||||||
taskManager.Run(Run);
|
taskManager.Run("Instance log sender for " + name, Run);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task Run() {
|
private async Task Run() {
|
||||||
|
@ -132,6 +132,8 @@ sealed class InstanceSessionManager : IDisposable {
|
|||||||
|
|
||||||
public async Task StopAll() {
|
public async Task StopAll() {
|
||||||
shutdownCancellationTokenSource.Cancel();
|
shutdownCancellationTokenSource.Cancel();
|
||||||
|
|
||||||
|
Logger.Information("Stopping all instances...");
|
||||||
|
|
||||||
await semaphore.WaitAsync(CancellationToken.None);
|
await semaphore.WaitAsync(CancellationToken.None);
|
||||||
try {
|
try {
|
||||||
|
@ -19,7 +19,7 @@ sealed class InstanceLaunchingState : IInstanceState, IDisposable {
|
|||||||
public void Initialize() {
|
public void Initialize() {
|
||||||
context.Logger.Information("Session starting...");
|
context.Logger.Information("Session starting...");
|
||||||
|
|
||||||
var launchTask = context.LaunchServices.TaskManager.Run(DoLaunch);
|
var launchTask = context.LaunchServices.TaskManager.Run("Launch procedure for instance " + context.ShortName, DoLaunch);
|
||||||
launchTask.ContinueWith(OnLaunchSuccess, CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default);
|
launchTask.ContinueWith(OnLaunchSuccess, CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default);
|
||||||
launchTask.ContinueWith(OnLaunchFailure, CancellationToken.None, TaskContinuationOptions.NotOnRanToCompletion, TaskScheduler.Default);
|
launchTask.ContinueWith(OnLaunchFailure, CancellationToken.None, TaskContinuationOptions.NotOnRanToCompletion, TaskScheduler.Default);
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ sealed class InstanceRunningState : IInstanceState {
|
|||||||
if (session.HasEnded) {
|
if (session.HasEnded) {
|
||||||
if (sessionObjects.Dispose()) {
|
if (sessionObjects.Dispose()) {
|
||||||
context.Logger.Warning("Session ended immediately after it was started.");
|
context.Logger.Warning("Session ended immediately after it was started.");
|
||||||
context.LaunchServices.TaskManager.Run(() => context.TransitionState(new InstanceNotRunningState(), InstanceStatus.Failed(InstanceLaunchFailReason.UnknownError)));
|
context.LaunchServices.TaskManager.Run("Transition state of instance " + context.ShortName + " to not running", () => context.TransitionState(new InstanceNotRunningState(), InstanceStatus.Failed(InstanceLaunchFailReason.UnknownError)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@ -75,7 +75,7 @@ sealed class InstanceRunningState : IInstanceState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
isStopping = true;
|
isStopping = true;
|
||||||
context.LaunchServices.TaskManager.Run(() => StopLater(stopStrategy.Seconds));
|
context.LaunchServices.TaskManager.Run("Delayed stop timer for instance " + context.ShortName, () => StopLater(stopStrategy.Seconds));
|
||||||
return (this, StopInstanceResult.StopInitiated);
|
return (this, StopInstanceResult.StopInitiated);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ sealed class InstanceStoppingState : IInstanceState, IDisposable {
|
|||||||
public void Initialize() {
|
public void Initialize() {
|
||||||
context.Logger.Information("Session stopping.");
|
context.Logger.Information("Session stopping.");
|
||||||
context.ReportStatus(InstanceStatus.Stopping);
|
context.ReportStatus(InstanceStatus.Stopping);
|
||||||
context.LaunchServices.TaskManager.Run(DoStop);
|
context.LaunchServices.TaskManager.Run("Stop procedure for instance " + context.ShortName, DoStop);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task DoStop() {
|
private async Task DoStop() {
|
||||||
|
@ -8,7 +8,7 @@ using Serilog;
|
|||||||
namespace Phantom.Agent;
|
namespace Phantom.Agent;
|
||||||
|
|
||||||
static class AgentKey {
|
static class AgentKey {
|
||||||
private static ILogger Logger { get; } = PhantomLogger.Create(typeof(AgentKey));
|
private static ILogger Logger { get; } = PhantomLogger.Create(nameof(AgentKey));
|
||||||
|
|
||||||
public static Task<(NetMQCertificate, AgentAuthToken)?> Load(string? agentKeyToken, string? agentKeyFilePath) {
|
public static Task<(NetMQCertificate, AgentAuthToken)?> Load(string? agentKeyToken, string? agentKeyFilePath) {
|
||||||
if (agentKeyFilePath != null) {
|
if (agentKeyFilePath != null) {
|
||||||
|
@ -6,7 +6,7 @@ using Serilog;
|
|||||||
namespace Phantom.Agent;
|
namespace Phantom.Agent;
|
||||||
|
|
||||||
static class GuidFile {
|
static class GuidFile {
|
||||||
private static ILogger Logger { get; } = PhantomLogger.Create(typeof(GuidFile));
|
private static ILogger Logger { get; } = PhantomLogger.Create(nameof(GuidFile));
|
||||||
|
|
||||||
private const string GuidFileName = "agent.guid";
|
private const string GuidFileName = "agent.guid";
|
||||||
|
|
||||||
|
@ -53,7 +53,8 @@ try {
|
|||||||
await agentServices.Initialize();
|
await agentServices.Initialize();
|
||||||
|
|
||||||
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
|
var rpcDisconnectSemaphore = new SemaphoreSlim(0, 1);
|
||||||
var rpcTask = RpcLauncher.Launch(new RpcConfiguration(PhantomLogger.Create("Rpc"), serverHost, serverPort, serverCertificate), agentToken, agentInfo, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken);
|
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), serverHost, serverPort, serverCertificate);
|
||||||
|
var rpcTask = RpcLauncher.Launch(rpcConfiguration, agentToken, agentInfo, MessageListenerFactory, rpcDisconnectSemaphore, shutdownCancellationToken);
|
||||||
try {
|
try {
|
||||||
await rpcTask.WaitAsync(shutdownCancellationToken);
|
await rpcTask.WaitAsync(shutdownCancellationToken);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -33,8 +33,8 @@ public static class PhantomLogger {
|
|||||||
return Base.ForContext("Category", name);
|
return Base.ForContext("Category", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create(Type type) {
|
public static ILogger Create(string name1, string name2) {
|
||||||
return Create(type.Name);
|
return Create(name1 + ":" + name2);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T>() {
|
public static ILogger Create<T>() {
|
||||||
@ -42,11 +42,11 @@ public static class PhantomLogger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T>(string name) {
|
public static ILogger Create<T>(string name) {
|
||||||
return Create(typeof(T).Name + ":" + name);
|
return Create(typeof(T).Name, name);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T1, T2>() {
|
public static ILogger Create<T1, T2>() {
|
||||||
return Create(typeof(T1).Name + ":" + typeof(T2).Name);
|
return Create(typeof(T1).Name, typeof(T2).Name);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void Dispose() {
|
public static void Dispose() {
|
||||||
|
@ -25,14 +25,14 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
|
|||||||
private readonly Func<RpcClientConnection, IMessageToServerListener> listenerFactory;
|
private readonly Func<RpcClientConnection, IMessageToServerListener> listenerFactory;
|
||||||
private readonly CancellationToken cancellationToken;
|
private readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
private RpcLauncher(RpcConfiguration config, ServerSocket socket, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket, config.Logger) {
|
private RpcLauncher(RpcConfiguration config, ServerSocket socket, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) : base(config, socket) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.listenerFactory = listenerFactory;
|
this.listenerFactory = listenerFactory;
|
||||||
this.cancellationToken = cancellationToken;
|
this.cancellationToken = cancellationToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override void Connect(ServerSocket socket) {
|
protected override void Connect(ServerSocket socket) {
|
||||||
var logger = config.Logger;
|
var logger = config.RuntimeLogger;
|
||||||
var url = config.TcpUrl;
|
var url = config.TcpUrl;
|
||||||
|
|
||||||
logger.Information("Starting ZeroMQ server on {Url}...", url);
|
logger.Information("Starting ZeroMQ server on {Url}...", url);
|
||||||
@ -41,7 +41,7 @@ public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
|
protected override void Run(ServerSocket socket, MessageReplyTracker replyTracker, TaskManager taskManager) {
|
||||||
var logger = config.Logger;
|
var logger = config.RuntimeLogger;
|
||||||
var clients = new Dictionary<ulong, Client>();
|
var clients = new Dictionary<ulong, Client>();
|
||||||
|
|
||||||
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
||||||
|
@ -32,7 +32,7 @@ public sealed class AgentManager {
|
|||||||
this.cancellationToken = configuration.CancellationToken;
|
this.cancellationToken = configuration.CancellationToken;
|
||||||
this.authToken = authToken;
|
this.authToken = authToken;
|
||||||
this.databaseProvider = databaseProvider;
|
this.databaseProvider = databaseProvider;
|
||||||
taskManager.Run(RefreshAgentStatus);
|
taskManager.Run("Refresh agent status loop", RefreshAgentStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Initialize() {
|
public async Task Initialize() {
|
||||||
|
@ -36,7 +36,7 @@ public sealed partial class AuditLog {
|
|||||||
|
|
||||||
private void AddEvent(string? userId, AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
|
private void AddEvent(string? userId, AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
|
||||||
var eventEntity = new AuditEventEntity(userId, eventType, subjectId, extra);
|
var eventEntity = new AuditEventEntity(userId, eventType, subjectId, extra);
|
||||||
taskManager.Run(() => AddEventToDatabase(eventEntity));
|
taskManager.Run("Store audit log event", () => AddEventToDatabase(eventEntity));
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task AddEvent(AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
|
private async Task AddEvent(AuditEventType eventType, string subjectId, Dictionary<string, object?>? extra = null) {
|
||||||
|
@ -20,7 +20,7 @@ public sealed class PhantomLoginStore {
|
|||||||
|
|
||||||
private PhantomLoginStore(TaskManager taskManager, CancellationToken cancellationToken) {
|
private PhantomLoginStore(TaskManager taskManager, CancellationToken cancellationToken) {
|
||||||
this.cancellationToken = cancellationToken;
|
this.cancellationToken = cancellationToken;
|
||||||
taskManager.Run(RunExpirationLoop);
|
taskManager.Run("Web login entry expiration loop", RunExpirationLoop);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task RunExpirationLoop() {
|
private async Task RunExpirationLoop() {
|
||||||
|
@ -8,7 +8,7 @@ using Serilog;
|
|||||||
namespace Phantom.Server;
|
namespace Phantom.Server;
|
||||||
|
|
||||||
static class CertificateFiles {
|
static class CertificateFiles {
|
||||||
private static ILogger Logger { get; } = PhantomLogger.Create(typeof(CertificateFiles));
|
private static ILogger Logger { get; } = PhantomLogger.Create(nameof(CertificateFiles));
|
||||||
|
|
||||||
private const string SecretKeyFileName = "secret.key";
|
private const string SecretKeyFileName = "secret.key";
|
||||||
private const string AgentKeyFileName = "agent.key";
|
private const string AgentKeyFileName = "agent.key";
|
||||||
|
@ -15,7 +15,7 @@ using WebConfiguration = Phantom.Server.Web.Configuration;
|
|||||||
using WebLauncher = Phantom.Server.Web.Launcher;
|
using WebLauncher = Phantom.Server.Web.Launcher;
|
||||||
|
|
||||||
var cancellationTokenSource = new CancellationTokenSource();
|
var cancellationTokenSource = new CancellationTokenSource();
|
||||||
var taskManager = new TaskManager();
|
var taskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Server"));
|
||||||
|
|
||||||
PosixSignals.RegisterCancellation(cancellationTokenSource, static () => {
|
PosixSignals.RegisterCancellation(cancellationTokenSource, static () => {
|
||||||
PhantomLogger.Root.InformationHeading("Stopping Phantom Panel server...");
|
PhantomLogger.Root.InformationHeading("Stopping Phantom Panel server...");
|
||||||
@ -53,7 +53,7 @@ try {
|
|||||||
|
|
||||||
var (certificate, agentToken) = certificateData.Value;
|
var (certificate, agentToken) = certificateData.Value;
|
||||||
|
|
||||||
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), rpcServerHost, rpcServerPort, certificate);
|
var rpcConfiguration = new RpcConfiguration(PhantomLogger.Create("Rpc"), PhantomLogger.Create<TaskManager>("Rpc"), rpcServerHost, rpcServerPort, certificate);
|
||||||
var webConfiguration = new WebConfiguration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, webKeysPath, cancellationTokenSource.Token);
|
var webConfiguration = new WebConfiguration(PhantomLogger.Create("Web"), webServerHost, webServerPort, webBasePath, webKeysPath, cancellationTokenSource.Token);
|
||||||
|
|
||||||
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
|
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
|
||||||
@ -79,7 +79,6 @@ try {
|
|||||||
} finally {
|
} finally {
|
||||||
cancellationTokenSource.Cancel();
|
cancellationTokenSource.Cancel();
|
||||||
|
|
||||||
PhantomLogger.Root.Information("Stopping task manager...");
|
|
||||||
await taskManager.Stop();
|
await taskManager.Stop();
|
||||||
|
|
||||||
cancellationTokenSource.Dispose();
|
cancellationTokenSource.Dispose();
|
||||||
|
@ -19,7 +19,7 @@ public abstract class MessageHandler<TListener> {
|
|||||||
|
|
||||||
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
||||||
cancellationToken.ThrowIfCancellationRequested();
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
taskManager.Run(async () => {
|
taskManager.Run("Handle message {Type}" + message.GetType().Name, async () => {
|
||||||
try {
|
try {
|
||||||
await Handle<TMessage, TReply>(sequenceId, message);
|
await Handle<TMessage, TReply>(sequenceId, message);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -3,6 +3,6 @@ using Serilog;
|
|||||||
|
|
||||||
namespace Phantom.Utils.Rpc;
|
namespace Phantom.Utils.Rpc;
|
||||||
|
|
||||||
public sealed record RpcConfiguration(ILogger Logger, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
public sealed record RpcConfiguration(ILogger RuntimeLogger, ILogger TaskManagerLogger, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
||||||
public string TcpUrl => "tcp://" + Host + ":" + Port;
|
public string TcpUrl => "tcp://" + Host + ":" + Port;
|
||||||
}
|
}
|
||||||
|
@ -26,17 +26,17 @@ static class RpcRuntime {
|
|||||||
|
|
||||||
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new() {
|
public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new() {
|
||||||
private readonly TSocket socket;
|
private readonly TSocket socket;
|
||||||
|
private readonly ILogger runtimeLogger;
|
||||||
private readonly MessageReplyTracker replyTracker;
|
private readonly MessageReplyTracker replyTracker;
|
||||||
private readonly TaskManager taskManager;
|
private readonly TaskManager taskManager;
|
||||||
private readonly ILogger logger;
|
|
||||||
|
|
||||||
protected RpcRuntime(TSocket socket, ILogger logger) {
|
protected RpcRuntime(RpcConfiguration configuration, TSocket socket) {
|
||||||
RpcRuntime.MarkRuntimeCreated();
|
RpcRuntime.MarkRuntimeCreated();
|
||||||
RpcRuntime.SetDefaultSocketOptions(socket.Options);
|
RpcRuntime.SetDefaultSocketOptions(socket.Options);
|
||||||
this.socket = socket;
|
this.socket = socket;
|
||||||
this.logger = logger;
|
this.runtimeLogger = configuration.RuntimeLogger;
|
||||||
this.replyTracker = new MessageReplyTracker(logger);
|
this.replyTracker = new MessageReplyTracker(runtimeLogger);
|
||||||
this.taskManager = new TaskManager();
|
this.taskManager = new TaskManager(configuration.TaskManagerLogger);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected async Task Launch() {
|
protected async Task Launch() {
|
||||||
@ -51,13 +51,12 @@ public abstract class RpcRuntime<TSocket> where TSocket : ThreadSafeSocket, new(
|
|||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
// ignore
|
// ignore
|
||||||
} finally {
|
} finally {
|
||||||
logger.Information("Stopping task manager...");
|
|
||||||
await taskManager.Stop();
|
await taskManager.Stop();
|
||||||
await Disconnect();
|
await Disconnect();
|
||||||
|
|
||||||
socket.Dispose();
|
socket.Dispose();
|
||||||
NetMQConfig.Cleanup();
|
NetMQConfig.Cleanup();
|
||||||
logger.Information("ZeroMQ client stopped.");
|
runtimeLogger.Information("ZeroMQ client stopped.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,6 +5,10 @@
|
|||||||
<ImplicitUsings>enable</ImplicitUsings>
|
<ImplicitUsings>enable</ImplicitUsings>
|
||||||
<Nullable>enable</Nullable>
|
<Nullable>enable</Nullable>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Serilog" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\Phantom.Utils.Collections\Phantom.Utils.Collections.csproj" />
|
<ProjectReference Include="..\Phantom.Utils.Collections\Phantom.Utils.Collections.csproj" />
|
||||||
|
@ -1,21 +1,24 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using Phantom.Utils.Collections;
|
using Phantom.Utils.Collections;
|
||||||
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Utils.Runtime;
|
namespace Phantom.Utils.Runtime;
|
||||||
|
|
||||||
public sealed class TaskManager {
|
public sealed class TaskManager {
|
||||||
|
private readonly ILogger logger;
|
||||||
private readonly CancellationTokenSource cancellationTokenSource = new ();
|
private readonly CancellationTokenSource cancellationTokenSource = new ();
|
||||||
private readonly CancellationToken cancellationToken;
|
private readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
private readonly ConcurrentDictionary<Task, object?> runningTasks = new (ReferenceEqualityComparer<Task>.Instance);
|
private readonly ConcurrentDictionary<Task, string> runningTasks = new (ReferenceEqualityComparer<Task>.Instance);
|
||||||
|
|
||||||
public TaskManager() {
|
public TaskManager(ILogger logger) {
|
||||||
cancellationToken = cancellationTokenSource.Token;
|
this.logger = logger;
|
||||||
|
this.cancellationToken = cancellationTokenSource.Token;
|
||||||
}
|
}
|
||||||
|
|
||||||
private T Add<T>(T task) where T : Task {
|
private T Add<T>(string name, T task) where T : Task {
|
||||||
cancellationToken.ThrowIfCancellationRequested();
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
runningTasks.TryAdd(task, null);
|
runningTasks.TryAdd(task, name);
|
||||||
task.ContinueWith(OnFinished, CancellationToken.None, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Default);
|
task.ContinueWith(OnFinished, CancellationToken.None, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Default);
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
@ -24,21 +27,44 @@ public sealed class TaskManager {
|
|||||||
runningTasks.TryRemove(task, out _);
|
runningTasks.TryRemove(task, out _);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task Run(Action action) {
|
public Task Run(string name, Action action) {
|
||||||
return Add(Task.Run(action, cancellationToken));
|
return Add(name, Task.Run(action, cancellationToken));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task Run(Func<Task> taskFunc) {
|
public Task Run(string name, Func<Task> taskFunc) {
|
||||||
return Add(Task.Run(taskFunc, cancellationToken));
|
return Add(name, Task.Run(taskFunc, cancellationToken));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<T> Run<T>(Func<Task<T>> taskFunc) {
|
public Task<T> Run<T>(string name, Func<Task<T>> taskFunc) {
|
||||||
return Add(Task.Run(taskFunc, cancellationToken));
|
return Add(name, Task.Run(taskFunc, cancellationToken));
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Stop() {
|
public async Task Stop() {
|
||||||
cancellationTokenSource.Cancel();
|
logger.Information("Stopping task manager...");
|
||||||
|
|
||||||
|
cancellationTokenSource.Cancel();
|
||||||
|
|
||||||
|
var remainingTasksAwaiterTask = WaitForRemainingTasks();
|
||||||
|
while (true) {
|
||||||
|
var logStateTimeoutTask = Task.Delay(TimeSpan.FromSeconds(10), CancellationToken.None);
|
||||||
|
var completedTask = await Task.WhenAny(remainingTasksAwaiterTask, logStateTimeoutTask);
|
||||||
|
if (completedTask == logStateTimeoutTask) {
|
||||||
|
var remainingTaskNames = runningTasks.Values.Order().ToList();
|
||||||
|
var remainingTaskNameList = string.Join('\n', remainingTaskNames.Select(static name => "- " + name));
|
||||||
|
logger.Warning("Waiting for {TaskCount} task(s) to finish:\n{TaskNames}", remainingTaskNames.Count, remainingTaskNameList);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
runningTasks.Clear();
|
||||||
|
cancellationTokenSource.Dispose();
|
||||||
|
|
||||||
|
logger.Information("Task manager stopped.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task WaitForRemainingTasks() {
|
||||||
foreach (var task in runningTasks.Keys) {
|
foreach (var task in runningTasks.Keys) {
|
||||||
try {
|
try {
|
||||||
await task;
|
await task;
|
||||||
@ -46,8 +72,5 @@ public sealed class TaskManager {
|
|||||||
// ignored
|
// ignored
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
runningTasks.Clear();
|
|
||||||
cancellationTokenSource.Dispose();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user