1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-10-19 09:42:50 +02:00

Compare commits

...

2 Commits

Author SHA1 Message Date
d241ed9f2f
WIP 2024-03-08 16:31:15 +01:00
7eb72b7360
Implement actors in Controller via Akka.NET 2024-03-08 16:31:15 +01:00
54 changed files with 1466 additions and 710 deletions

View File

@ -0,0 +1,14 @@
using MemoryPack;
using Phantom.Common.Data.Agent;
namespace Phantom.Common.Data.Web.Agent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record Agent(
[property: MemoryPackOrder(0)] AgentConfiguration Configuration,
[property: MemoryPackOrder(1)] AgentStats? Stats,
[property: MemoryPackOrder(2)] IAgentConnectionStatus ConnectionStatus
) {
[MemoryPackIgnore]
public RamAllocationUnits? AvailableMemory => Configuration.MaxMemory - Stats?.RunningInstanceMemory;
}

View File

@ -0,0 +1,20 @@
using MemoryPack;
using Phantom.Common.Data.Agent;
namespace Phantom.Common.Data.Web.Agent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentConfiguration(
[property: MemoryPackOrder(0)] Guid AgentGuid,
[property: MemoryPackOrder(1)] string AgentName,
[property: MemoryPackOrder(2)] ushort ProtocolVersion,
[property: MemoryPackOrder(3)] string BuildVersion,
[property: MemoryPackOrder(4)] ushort MaxInstances,
[property: MemoryPackOrder(5)] RamAllocationUnits MaxMemory,
[property: MemoryPackOrder(6)] AllowedPorts? AllowedServerPorts = null,
[property: MemoryPackOrder(7)] AllowedPorts? AllowedRconPorts = null
) {
public static AgentConfiguration From(AgentInfo agentInfo) {
return new AgentConfiguration(agentInfo.AgentGuid, agentInfo.AgentName, agentInfo.ProtocolVersion, agentInfo.BuildVersion, agentInfo.MaxInstances, agentInfo.MaxMemory, agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
}
}

View File

@ -1,22 +0,0 @@
using MemoryPack;
using Phantom.Common.Data.Agent;
namespace Phantom.Common.Data.Web.Agent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentWithStats(
[property: MemoryPackOrder(0)] Guid Guid,
[property: MemoryPackOrder(1)] string Name,
[property: MemoryPackOrder(2)] ushort ProtocolVersion,
[property: MemoryPackOrder(3)] string BuildVersion,
[property: MemoryPackOrder(4)] ushort MaxInstances,
[property: MemoryPackOrder(5)] RamAllocationUnits MaxMemory,
[property: MemoryPackOrder(6)] AllowedPorts? AllowedServerPorts,
[property: MemoryPackOrder(7)] AllowedPorts? AllowedRconPorts,
[property: MemoryPackOrder(8)] AgentStats? Stats,
[property: MemoryPackOrder(9)] DateTimeOffset? LastPing,
[property: MemoryPackOrder(10)] bool IsOnline
) {
[MemoryPackIgnore]
public RamAllocationUnits? AvailableMemory => MaxMemory - Stats?.RunningInstanceMemory;
}

View File

@ -0,0 +1,27 @@
using MemoryPack;
namespace Phantom.Common.Data.Web.Agent;
[MemoryPackable]
[MemoryPackUnion(0, typeof(AgentIsOffline))]
[MemoryPackUnion(1, typeof(AgentIsDisconnected))]
[MemoryPackUnion(2, typeof(AgentIsOnline))]
public partial interface IAgentConnectionStatus {}
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentIsOffline : IAgentConnectionStatus;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentIsDisconnected([property: MemoryPackOrder(0)] DateTimeOffset LastPingTime) : IAgentConnectionStatus;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentIsOnline : IAgentConnectionStatus;
public static class AgentConnectionStatus {
public static readonly IAgentConnectionStatus Offline = new AgentIsOffline();
public static readonly IAgentConnectionStatus Online = new AgentIsOnline();
public static IAgentConnectionStatus Disconnected(DateTimeOffset lastPingTime) {
return new AgentIsDisconnected(lastPingTime);
}
}

View File

@ -4,8 +4,8 @@ namespace Phantom.Common.Data.Agent;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentInfo( public sealed partial record AgentInfo(
[property: MemoryPackOrder(0)] Guid Guid, [property: MemoryPackOrder(0)] Guid AgentGuid,
[property: MemoryPackOrder(1)] string Name, [property: MemoryPackOrder(1)] string AgentName,
[property: MemoryPackOrder(2)] ushort ProtocolVersion, [property: MemoryPackOrder(2)] ushort ProtocolVersion,
[property: MemoryPackOrder(3)] string BuildVersion, [property: MemoryPackOrder(3)] string BuildVersion,
[property: MemoryPackOrder(4)] ushort MaxInstances, [property: MemoryPackOrder(4)] ushort MaxInstances,

View File

@ -3,3 +3,12 @@
public enum ConfigureInstanceResult : byte { public enum ConfigureInstanceResult : byte {
Success Success
} }
public static class ConfigureInstanceResultExtensions {
public static string ToSentence(this ConfigureInstanceResult reason) {
return reason switch {
ConfigureInstanceResult.Success => "Success.",
_ => "Unknown error."
};
}
}

View File

@ -2,6 +2,7 @@
public enum InstanceActionGeneralResult : byte { public enum InstanceActionGeneralResult : byte {
None, None,
AgentDoesNotExist,
AgentShuttingDown, AgentShuttingDown,
AgentIsNotResponding, AgentIsNotResponding,
InstanceDoesNotExist InstanceDoesNotExist

View File

@ -18,6 +18,7 @@ public sealed partial record InstanceActionResult<T>(
public string ToSentence(Func<T, string> concreteResultToSentence) { public string ToSentence(Func<T, string> concreteResultToSentence) {
return GeneralResult switch { return GeneralResult switch {
InstanceActionGeneralResult.None => concreteResultToSentence(ConcreteResult!), InstanceActionGeneralResult.None => concreteResultToSentence(ConcreteResult!),
InstanceActionGeneralResult.AgentDoesNotExist => "Agent does not exist.",
InstanceActionGeneralResult.AgentShuttingDown => "Agent is shutting down.", InstanceActionGeneralResult.AgentShuttingDown => "Agent is shutting down.",
InstanceActionGeneralResult.AgentIsNotResponding => "Agent is not responding.", InstanceActionGeneralResult.AgentIsNotResponding => "Agent is not responding.",
InstanceActionGeneralResult.InstanceDoesNotExist => "Instance does not exist.", InstanceActionGeneralResult.InstanceDoesNotExist => "Instance does not exist.",

View File

@ -6,7 +6,8 @@ namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record LaunchInstanceMessage( public sealed partial record LaunchInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid, [property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid InstanceGuid [property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] Guid InstanceGuid
) : IMessageToController<InstanceActionResult<LaunchInstanceResult>> { ) : IMessageToController<InstanceActionResult<LaunchInstanceResult>> {
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToControllerListener listener) { public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleLaunchInstance(this); return listener.HandleLaunchInstance(this);

View File

@ -6,8 +6,9 @@ namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record SendCommandToInstanceMessage( public sealed partial record SendCommandToInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid, [property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid InstanceGuid, [property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] string Command [property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] string Command
) : IMessageToController<InstanceActionResult<SendCommandToInstanceResult>> { ) : IMessageToController<InstanceActionResult<SendCommandToInstanceResult>> {
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToControllerListener listener) { public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleSendCommandToInstance(this); return listener.HandleSendCommandToInstance(this);

View File

@ -7,8 +7,9 @@ namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record StopInstanceMessage( public sealed partial record StopInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid, [property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] Guid InstanceGuid, [property: MemoryPackOrder(1)] Guid AgentGuid,
[property: MemoryPackOrder(2)] MinecraftStopStrategy StopStrategy [property: MemoryPackOrder(2)] Guid InstanceGuid,
[property: MemoryPackOrder(3)] MinecraftStopStrategy StopStrategy
) : IMessageToController<InstanceActionResult<StopInstanceResult>> { ) : IMessageToController<InstanceActionResult<StopInstanceResult>> {
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToControllerListener listener) { public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleStopInstance(this); return listener.HandleStopInstance(this);

View File

@ -7,7 +7,7 @@ namespace Phantom.Common.Messages.Web.ToWeb;
[MemoryPackable(GenerateType.VersionTolerant)] [MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record RefreshAgentsMessage( public sealed partial record RefreshAgentsMessage(
[property: MemoryPackOrder(0)] ImmutableArray<AgentWithStats> Agents [property: MemoryPackOrder(0)] ImmutableArray<Agent> Agents
) : IMessageToWeb { ) : IMessageToWeb {
public Task<NoReply> Accept(IMessageToWebListener listener) { public Task<NoReply> Accept(IMessageToWebListener listener) {
return listener.HandleRefreshAgents(this); return listener.HandleRefreshAgents(this);

View File

@ -1,39 +0,0 @@
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
namespace Phantom.Controller.Services.Agents;
public sealed record Agent(
Guid Guid,
string Name,
ushort ProtocolVersion,
string BuildVersion,
ushort MaxInstances,
RamAllocationUnits MaxMemory,
AllowedPorts? AllowedServerPorts = null,
AllowedPorts? AllowedRconPorts = null,
AgentStats? Stats = null,
DateTimeOffset? LastPing = null
) {
internal AgentConnection? Connection { get; init; }
public bool IsOnline { get; internal init; }
public bool IsOffline => !IsOnline;
internal Agent(AgentInfo info) : this(info.Guid, info.Name, info.ProtocolVersion, info.BuildVersion, info.MaxInstances, info.MaxMemory, info.AllowedServerPorts, info.AllowedRconPorts) {}
internal Agent AsOnline(DateTimeOffset lastPing) => this with {
LastPing = lastPing,
IsOnline = Connection != null
};
internal Agent AsDisconnected() => this with {
IsOnline = false
};
internal Agent AsOffline() => this with {
Connection = null,
Stats = null,
IsOnline = false
};
}

View File

@ -0,0 +1,327 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using Akka.Actor;
using Microsoft.EntityFrameworkCore;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Minecraft;
using Phantom.Common.Messages.Agent;
using Phantom.Controller.Database;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Actor.Mailbox;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Serilog;
namespace Phantom.Controller.Services.Agents;
sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<AgentActor>();
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5);
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, TaskManager TaskManager, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new AgentActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
}
private readonly ControllerState controllerState;
private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
private AgentConfiguration configuration;
private AgentStats? stats;
private ImmutableArray<TaggedJavaRuntime> javaRuntimes = ImmutableArray<TaggedJavaRuntime>.Empty;
private readonly AgentConnection connection;
private DateTimeOffset? lastPingTime;
private bool isOnline;
private IAgentConnectionStatus ConnectionStatus {
get {
if (isOnline) {
return AgentConnectionStatus.Online;
}
else if (lastPingTime == null) {
return AgentConnectionStatus.Offline;
}
else {
return AgentConnectionStatus.Disconnected(lastPingTime.Value);
}
}
}
private readonly ActorRef<AgentDatabaseStorageActor.ICommand> databaseStorageActor;
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
private readonly Dictionary<Guid, Instance> instanceDataByGuid = new ();
private AgentActor(Init init) {
this.controllerState = init.ControllerState;
this.minecraftVersions = init.MinecraftVersions;
this.dbProvider = init.DbProvider;
this.taskManager = init.TaskManager;
this.cancellationToken = init.CancellationToken;
this.configuration = init.Configuration;
this.connection = new AgentConnection(configuration.AgentGuid, configuration.AgentName);
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(configuration.AgentGuid, dbProvider, cancellationToken)), "DatabaseStorage");
NotifyAgentUpdated();
ReceiveAsync<InitializeCommand>(Initialize);
ReceiveAndReply<RegisterCommand, ImmutableArray<Instance>>(Register);
Receive<UnregisterCommand>(Unregister);
Receive<RefreshConnectionStatusCommand>(RefreshConnectionStatus);
Receive<NotifyIsAliveCommand>(NotifyIsAlive);
Receive<UpdateStatsCommand>(UpdateStats);
Receive<UpdateJavaRuntimesCommand>(UpdateJavaRuntimes);
ReceiveAsyncAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
Receive<LaunchInstanceCommand>(LaunchInstance);
Receive<StopInstanceCommand>(StopInstance);
Receive<SendMinecraftCommandCommand>(SendMinecraftCommand);
Receive<ReceiveInstanceDataCommand>(ReceiveInstanceData);
}
private void NotifyAgentUpdated() {
controllerState.UpdateAgent(new Agent(configuration, stats, ConnectionStatus));
}
protected override void PreStart() {
Self.Tell(new InitializeCommand());
Context.System.Scheduler.ScheduleTellRepeatedly(DisconnectionRecheckInterval, DisconnectionRecheckInterval, Self, new RefreshConnectionStatusCommand(), Self);
}
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
UpdateInstanceData(instance);
var instanceActor = CreateInstanceActor(instance);
instanceActorByGuid.Add(instance.Configuration.InstanceGuid, instanceActor);
return instanceActor;
}
private void UpdateInstanceData(Instance instance) {
instanceDataByGuid[instance.Configuration.InstanceGuid] = instance;
controllerState.UpdateInstance(instance);
}
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
var init = new InstanceActor.Init(instance, SelfTyped, connection, dbProvider, cancellationToken);
var name = "Instance:" + instance.Configuration.InstanceGuid;
return Context.ActorOf(InstanceActor.Factory(init), name);
}
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
instance.Forward(command);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
}
}
public interface ICommand {}
private sealed record InitializeCommand : ICommand;
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<Instance>>;
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand;
private sealed record RefreshConnectionStatusCommand : ICommand;
public sealed record NotifyIsAliveCommand : ICommand;
public sealed record UpdateStatsCommand(int RunningInstanceCount, RamAllocationUnits RunningInstanceMemory) : ICommand;
public sealed record UpdateJavaRuntimesCommand(ImmutableArray<TaggedJavaRuntime> JavaRuntimes) : ICommand;
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendMinecraftCommandCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
public sealed record ReceiveInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead;
private async Task Initialize(InitializeCommand command) {
await using var ctx = dbProvider.Eager();
await foreach (var entity in ctx.Instances.Where(instance => instance.AgentGuid == configuration.AgentGuid).AsAsyncEnumerable().WithCancellation(cancellationToken)) {
var instanceConfiguration = new InstanceConfiguration(
entity.AgentGuid,
entity.InstanceGuid,
entity.InstanceName,
entity.ServerPort,
entity.RconPort,
entity.MinecraftVersion,
entity.MinecraftServerKind,
entity.MemoryAllocation,
entity.JavaRuntimeGuid,
JvmArgumentsHelper.Split(entity.JvmArguments)
);
CreateNewInstance(Instance.Offline(instanceConfiguration, entity.LaunchAutomatically));
}
}
private ImmutableArray<Instance> Register(RegisterCommand command) {
configuration = command.Configuration;
connection.UpdateConnection(command.Connection, configuration.AgentName);
lastPingTime = DateTimeOffset.Now;
isOnline = true;
NotifyAgentUpdated();
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentCommand(configuration.AgentName, configuration.ProtocolVersion, configuration.BuildVersion, configuration.MaxInstances, configuration.MaxMemory));
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
return instanceDataByGuid.Values.ToImmutableArray();
}
private void Unregister(UnregisterCommand command) {
if (connection.CloseIfSame(command.Connection)) {
stats = null;
lastPingTime = null;
isOnline = false;
NotifyAgentUpdated();
var setStatusCommand = new InstanceActor.SetStatusCommand(InstanceStatus.Offline);
foreach (var instance in instanceActorByGuid.Values) {
instance.Tell(setStatusCommand);
}
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
}
}
private void RefreshConnectionStatus(RefreshConnectionStatusCommand command) {
if (isOnline && lastPingTime != null && DateTimeOffset.Now - lastPingTime >= DisconnectionThreshold) {
isOnline = false;
NotifyAgentUpdated();
Logger.Warning("Lost connection to agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
}
}
private void NotifyIsAlive(NotifyIsAliveCommand command) {
lastPingTime = DateTimeOffset.Now;
if (!isOnline) {
isOnline = true;
NotifyAgentUpdated();
}
}
private void UpdateStats(UpdateStatsCommand command) {
stats = new AgentStats(command.RunningInstanceCount, command.RunningInstanceMemory);
NotifyAgentUpdated();
}
private void UpdateJavaRuntimes(UpdateJavaRuntimesCommand command) {
javaRuntimes = command.JavaRuntimes;
controllerState.UpdateAgentJavaRuntimes(configuration.AgentGuid, javaRuntimes);
}
private async Task CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command, TaskCompletionSource<InstanceActionResult<CreateOrUpdateInstanceResult>> result2) {
var instanceConfiguration = command.Configuration;
var instanceName = instanceConfiguration.InstanceName;
var instanceGuid = instanceConfiguration.InstanceGuid;
if (string.IsNullOrWhiteSpace(instanceName)) {
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
return;
}
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
return;
}
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
if (serverExecutableInfo == null) {
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
return;
}
bool isNewInstance = !instanceActorByGuid.TryGetValue(instanceGuid, out var instanceActorRef);
if (isNewInstance) {
instanceActorRef = CreateNewInstance(Instance.Offline(instanceConfiguration));
}
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isNewInstance);
taskManager.Run("Configure instance " + instanceGuid, async () => await ConfigureInstance(instanceActorRef, configureInstanceCommand, configuration.AgentName, result2, cancellationToken));
}
[SuppressMessage("ReSharper", "ConvertIfStatementToConditionalTernaryExpression")]
private static async Task ConfigureInstance(ActorRef<InstanceActor.ICommand> instanceActorRef, InstanceActor.ConfigureInstanceCommand command, string agentName, TaskCompletionSource<InstanceActionResult<CreateOrUpdateInstanceResult>> result2, CancellationToken cancellationToken) {
var instanceName = command.Configuration.InstanceName;
var instanceGuid = command.Configuration.InstanceGuid;
var result = await instanceActorRef.Request(command, cancellationToken);
if (result.Is(ConfigureInstanceResult.Success)) {
if (command.IsNewInstance) {
Logger.Information("Added instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\".", instanceName, instanceGuid, agentName);
}
else {
Logger.Information("Edited instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\".", instanceName, instanceGuid, agentName);
}
}
else {
if (command.IsNewInstance) {
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, agentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
else {
Logger.Information("Failed editing instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, agentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
}
result2.SetResult(result.Map(static result => result switch {
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
_ => CreateOrUpdateInstanceResult.UnknownError
}));
}
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
}
private void LaunchInstance(LaunchInstanceCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
}
private void StopInstance(StopInstanceCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
}
private void SendMinecraftCommand(SendMinecraftCommandCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SendMinecraftCommandCommand(command.AuditLogUserGuid, command.Command));
}
private void ReceiveInstanceData(ReceiveInstanceDataCommand command) {
UpdateInstanceData(command.Instance);
}
}

View File

@ -1,28 +1,65 @@
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Serilog;
namespace Phantom.Controller.Services.Agents; namespace Phantom.Controller.Services.Agents;
sealed class AgentConnection { sealed class AgentConnection {
private readonly RpcConnectionToClient<IMessageToAgentListener> connection; private static readonly ILogger Logger = PhantomLogger.Create<AgentConnection>();
internal AgentConnection(RpcConnectionToClient<IMessageToAgentListener> connection) { private readonly Guid agentGuid;
this.connection = connection; private string agentName;
private RpcConnectionToClient<IMessageToAgentListener>? connection;
public AgentConnection(Guid agentGuid, string agentName) {
this.agentName = agentName;
this.agentGuid = agentGuid;
} }
public bool IsSame(RpcConnectionToClient<IMessageToAgentListener> connection) { public void UpdateConnection(RpcConnectionToClient<IMessageToAgentListener> newConnection, string newAgentName) {
return this.connection.IsSame(connection); lock (this) {
connection?.Close();
connection = newConnection;
agentName = newAgentName;
}
} }
public void Close() { public bool CloseIfSame(RpcConnectionToClient<IMessageToAgentListener> expected) {
connection.Close(); lock (this) {
if (connection != null && connection.IsSame(expected)) {
connection.Close();
return true;
}
}
return false;
} }
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent { public Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
return connection.Send(message); lock (this) {
if (connection == null) {
LogAgentOffline();
return Task.CompletedTask;
}
return connection.Send(message);
}
} }
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class { public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken); lock (this) {
if (connection == null) {
LogAgentOffline();
return Task.FromResult<TReply?>(default);
}
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken)!;
}
}
private void LogAgentOffline() {
Logger.Error("Could not send message to offline agent \"{Name}\" (GUID {Guid}).", agentName, agentGuid);
} }
} }

View File

@ -0,0 +1,82 @@
using Phantom.Common.Data;
using Phantom.Controller.Database;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Agents;
sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<AgentDatabaseStorageActor>();
public readonly record struct Init(Guid AgentGuid, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new AgentDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly Guid agentGuid;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private StoreAgentCommand? lastStoreCommand;
private bool hasScheduledFlush;
private AgentDatabaseStorageActor(Init init) {
this.agentGuid = init.AgentGuid;
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
Receive<StoreAgentCommand>(StoreAgent);
ReceiveAsync<FlushChangesCommand>(FlushChanges);
}
public interface ICommand {}
public sealed record StoreAgentCommand(string Name, ushort ProtocolVersion, string BuildVersion, ushort MaxInstances, RamAllocationUnits MaxMemory) : ICommand;
private sealed record FlushChangesCommand : ICommand;
private void StoreAgent(StoreAgentCommand command) {
this.lastStoreCommand = command;
ScheduleFlush(TimeSpan.FromSeconds(2));
}
private async Task FlushChanges(FlushChangesCommand command) {
hasScheduledFlush = false;
if (lastStoreCommand == null) {
return;
}
try {
await using var ctx = dbProvider.Eager();
var entity = ctx.AgentUpsert.Fetch(agentGuid);
entity.Name = lastStoreCommand.Name;
entity.ProtocolVersion = lastStoreCommand.ProtocolVersion;
entity.BuildVersion = lastStoreCommand.BuildVersion;
entity.MaxInstances = lastStoreCommand.MaxInstances;
entity.MaxMemory = lastStoreCommand.MaxMemory;
await ctx.SaveChangesAsync(cancellationToken);
} catch (Exception e) {
ScheduleFlush(TimeSpan.FromSeconds(10));
Logger.Error(e, "Could not store agent \"{AgentName}\" (GUID {AgentGuid}) to database.", lastStoreCommand.Name, agentGuid);
return;
}
Logger.Information("Stored agent \"{AgentName}\" (GUID {AgentGuid}) to database.", lastStoreCommand.Name, agentGuid);
lastStoreCommand = null;
}
private void ScheduleFlush(TimeSpan delay) {
if (hasScheduledFlush) {
return;
}
hasScheduledFlush = true;
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
}
}

View File

@ -1,15 +0,0 @@
using System.Collections.Immutable;
using Phantom.Common.Data.Java;
using Phantom.Utils.Collections;
namespace Phantom.Controller.Services.Agents;
sealed class AgentJavaRuntimesManager {
private readonly RwLockedDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> runtimes = new (LockRecursionPolicy.NoRecursion);
public ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> All => runtimes.ToImmutable();
internal void Update(Guid agentGuid, ImmutableArray<TaggedJavaRuntime> runtimes) {
this.runtimes[agentGuid] = runtimes;
}
}

View File

@ -1,16 +1,19 @@
using System.Collections.Immutable; using System.Collections.Concurrent;
using System.Collections.Immutable;
using Akka.Actor;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Services.Instances; using Phantom.Controller.Minecraft;
using Phantom.Utils.Collections; using Phantom.Utils.Actor;
using Phantom.Utils.Events;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Serilog; using Serilog;
namespace Phantom.Controller.Services.Agents; namespace Phantom.Controller.Services.Agents;
@ -18,136 +21,87 @@ namespace Phantom.Controller.Services.Agents;
sealed class AgentManager { sealed class AgentManager {
private static readonly ILogger Logger = PhantomLogger.Create<AgentManager>(); private static readonly ILogger Logger = PhantomLogger.Create<AgentManager>();
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5); private readonly ActorSystem actorSystem;
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
private readonly ObservableAgents agents = new (PhantomLogger.Create<AgentManager, ObservableAgents>());
public EventSubscribers<ImmutableArray<Agent>> AgentsChanged => agents.Subs;
private readonly CancellationToken cancellationToken;
private readonly AuthToken authToken; private readonly AuthToken authToken;
private readonly ControllerState controllerState;
private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider; private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
public AgentManager(AuthToken authToken, IDbContextProvider dbProvider, TaskManager taskManager, CancellationToken cancellationToken) { private readonly ConcurrentDictionary<Guid, ActorRef<AgentActor.ICommand>> agentsByGuid = new ();
private readonly Func<Guid, AgentConfiguration, ActorRef<AgentActor.ICommand>> addAgentActorFactory;
public AgentManager(ActorSystem actorSystem, AuthToken authToken, ControllerState controllerState, MinecraftVersions minecraftVersions, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
this.actorSystem = actorSystem;
this.authToken = authToken; this.authToken = authToken;
this.controllerState = controllerState;
this.minecraftVersions = minecraftVersions;
this.dbProvider = dbProvider; this.dbProvider = dbProvider;
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
taskManager.Run("Refresh agent status loop", RefreshAgentStatus);
addAgentActorFactory = (_, agent) => CreateAgentActor(agent);
} }
internal async Task Initialize() { private ActorRef<AgentActor.ICommand> CreateAgentActor(AgentConfiguration agentConfiguration) {
var init = new AgentActor.Init(agentConfiguration, controllerState, minecraftVersions, dbProvider, cancellationToken);
var name = "Agent:" + agentConfiguration.AgentGuid;
return actorSystem.ActorOf(AgentActor.Factory(init), name);
}
public async Task Initialize() {
await using var ctx = dbProvider.Eager(); await using var ctx = dbProvider.Eager();
await foreach (var entity in ctx.Agents.AsAsyncEnumerable().WithCancellation(cancellationToken)) { await foreach (var entity in ctx.Agents.AsAsyncEnumerable().WithCancellation(cancellationToken)) {
var agent = new Agent(entity.AgentGuid, entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory); var agentProperties = new AgentConfiguration(entity.AgentGuid, entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory);
if (!agents.ByGuid.AddOrReplaceIf(agent.Guid, agent, static oldAgent => oldAgent.IsOffline)) {
// TODO if (agentsByGuid.TryAdd(entity.AgentGuid, CreateAgentActor(agentProperties))) {
throw new InvalidOperationException("Unable to register agent from database: " + agent.Guid); Logger.Information("Loaded agent \"{AgentName}\" (GUID {AgentGuid}) from database.", agentProperties.AgentName, agentProperties.AgentGuid);
} }
} }
} }
public ImmutableDictionary<Guid, Agent> GetAgents() { public async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, RpcConnectionToClient<IMessageToAgentListener> connection) {
return agents.ByGuid.ToImmutable();
}
internal async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, InstanceManager instanceManager, RpcConnectionToClient<IMessageToAgentListener> connection) {
if (!this.authToken.FixedTimeEquals(authToken)) { if (!this.authToken.FixedTimeEquals(authToken)) {
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.InvalidToken)); await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.InvalidToken));
return false; return false;
} }
var agent = new Agent(agentInfo) { var agentProperties = AgentConfiguration.From(agentInfo);
LastPing = DateTimeOffset.Now, var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
IsOnline = true, var agentInstances = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
Connection = new AgentConnection(connection) await connection.Send(new RegisterAgentSuccessMessage(await GetInstanceConfigurationsForAgent(agentInstances)));
};
if (agents.ByGuid.AddOrReplace(agent.Guid, agent, out var oldAgent)) {
oldAgent.Connection?.Close();
}
await using (var ctx = dbProvider.Eager()) {
var entity = ctx.AgentUpsert.Fetch(agent.Guid);
entity.Name = agent.Name;
entity.ProtocolVersion = agent.ProtocolVersion;
entity.BuildVersion = agent.BuildVersion;
entity.MaxInstances = agent.MaxInstances;
entity.MaxMemory = agent.MaxMemory;
await ctx.SaveChangesAsync(cancellationToken);
}
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", agent.Name, agent.Guid);
var instanceConfigurations = await instanceManager.GetInstanceConfigurationsForAgent(agent.Guid);
await connection.Send(new RegisterAgentSuccessMessage(instanceConfigurations));
return true; return true;
} }
internal bool UnregisterAgent(Guid agentGuid, RpcConnectionToClient<IMessageToAgentListener> connection) { private async Task<ImmutableArray<ConfigureInstanceMessage>> GetInstanceConfigurationsForAgent(ImmutableArray<Instance> instances) {
if (agents.ByGuid.TryReplaceIf(agentGuid, static oldAgent => oldAgent.AsOffline(), oldAgent => oldAgent.Connection?.IsSame(connection) == true)) { var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
Logger.Information("Unregistered agent with GUID {Guid}.", agentGuid);
foreach (var (configuration, _, launchAutomatically) in instances) {
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(configuration.MinecraftVersion, cancellationToken);
configurationMessages.Add(new ConfigureInstanceMessage(configuration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
}
return configurationMessages.ToImmutable();
}
public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) {
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
agent.Tell(command);
return true; return true;
} }
else { else {
Logger.Warning("Could not deliver command {CommandType} to agent {AgentGuid}, agent not registered.", command.GetType().Name, agentGuid);
return false; return false;
} }
} }
internal Agent? GetAgent(Guid guid) { public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
return agents.ByGuid.TryGetValue(guid, out var agent) ? agent : null; if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
} return await agent.Request(command, cancellationToken);
internal void NotifyAgentIsAlive(Guid agentGuid) {
agents.ByGuid.TryReplace(agentGuid, static agent => agent.AsOnline(DateTimeOffset.Now));
}
internal void SetAgentStats(Guid agentGuid, int runningInstanceCount, RamAllocationUnits runningInstanceMemory) {
agents.ByGuid.TryReplace(agentGuid, agent => agent with { Stats = new AgentStats(runningInstanceCount, runningInstanceMemory) });
}
private async Task RefreshAgentStatus() {
static Agent MarkAgentAsOffline(Agent agent) {
Logger.Warning("Lost connection to agent \"{Name}\" (GUID {Guid}).", agent.Name, agent.Guid);
return agent.AsDisconnected();
} }
else {
while (!cancellationToken.IsCancellationRequested) { return InstanceActionResult.General<TReply>(InstanceActionGeneralResult.AgentDoesNotExist);
await Task.Delay(DisconnectionRecheckInterval, cancellationToken);
var now = DateTimeOffset.Now;
agents.ByGuid.ReplaceAllIf(MarkAgentAsOffline, agent => agent.IsOnline && agent.LastPing is {} lastPing && now - lastPing >= DisconnectionThreshold);
}
}
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, TMessage message, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null;
if (connection == null || agent == null) {
// TODO handle missing agent?
return null;
}
try {
return await connection.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
} catch (Exception e) {
Logger.Error(e, "Could not send message to agent \"{Name}\" (GUID {Guid}).", agent.Name, agent.Guid);
return null;
}
}
private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> {
public RwLockedObservableDictionary<Guid, Agent> ByGuid { get; } = new (LockRecursionPolicy.NoRecursion);
public ObservableAgents(ILogger logger) : base(logger) {
ByGuid.CollectionChanged += Update;
}
protected override ImmutableArray<Agent> GetData() {
return ByGuid.ValuesCopy;
} }
} }
} }

View File

@ -1,4 +1,5 @@
using Phantom.Common.Data; using Akka.Actor;
using Phantom.Common.Data;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Web; using Phantom.Common.Messages.Web;
using Phantom.Controller.Database; using Phantom.Controller.Database;
@ -8,19 +9,19 @@ using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances; using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Rpc; using Phantom.Controller.Services.Rpc;
using Phantom.Controller.Services.Users; using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks; using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services; namespace Phantom.Controller.Services;
public sealed class ControllerServices { public sealed class ControllerServices : IAsyncDisposable {
private TaskManager TaskManager { get; } private TaskManager TaskManager { get; }
private ControllerState ControllerState { get; }
private MinecraftVersions MinecraftVersions { get; } private MinecraftVersions MinecraftVersions { get; }
private AgentManager AgentManager { get; } private AgentManager AgentManager { get; }
private AgentJavaRuntimesManager AgentJavaRuntimesManager { get; }
private InstanceManager InstanceManager { get; }
private InstanceLogManager InstanceLogManager { get; } private InstanceLogManager InstanceLogManager { get; }
private EventLogManager EventLogManager { get; } private EventLogManager EventLogManager { get; }
@ -32,17 +33,23 @@ public sealed class ControllerServices {
private UserLoginManager UserLoginManager { get; } private UserLoginManager UserLoginManager { get; }
private AuditLogManager AuditLogManager { get; } private AuditLogManager AuditLogManager { get; }
private readonly ActorSystem actorSystem;
private readonly IDbContextProvider dbProvider; private readonly IDbContextProvider dbProvider;
private readonly AuthToken webAuthToken; private readonly AuthToken webAuthToken;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
public ControllerServices(IDbContextProvider dbProvider, AuthToken agentAuthToken, AuthToken webAuthToken, CancellationToken shutdownCancellationToken) { public ControllerServices(IDbContextProvider dbProvider, AuthToken agentAuthToken, AuthToken webAuthToken, CancellationToken shutdownCancellationToken) {
this.dbProvider = dbProvider;
this.webAuthToken = webAuthToken;
this.cancellationToken = shutdownCancellationToken;
this.actorSystem = ActorSystemFactory.Create("Controller");
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>()); this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
this.ControllerState = new ControllerState();
this.MinecraftVersions = new MinecraftVersions(); this.MinecraftVersions = new MinecraftVersions();
this.AgentManager = new AgentManager(agentAuthToken, dbProvider, TaskManager, shutdownCancellationToken); this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.AgentJavaRuntimesManager = new AgentJavaRuntimesManager();
this.InstanceManager = new InstanceManager(AgentManager, MinecraftVersions, dbProvider, shutdownCancellationToken);
this.InstanceLogManager = new InstanceLogManager(); this.InstanceLogManager = new InstanceLogManager();
this.UserManager = new UserManager(dbProvider); this.UserManager = new UserManager(dbProvider);
@ -53,25 +60,25 @@ public sealed class ControllerServices {
this.UserLoginManager = new UserLoginManager(UserManager, PermissionManager); this.UserLoginManager = new UserLoginManager(UserManager, PermissionManager);
this.AuditLogManager = new AuditLogManager(dbProvider); this.AuditLogManager = new AuditLogManager(dbProvider);
this.EventLogManager = new EventLogManager(dbProvider, TaskManager, shutdownCancellationToken); this.EventLogManager = new EventLogManager(dbProvider, TaskManager, shutdownCancellationToken);
this.dbProvider = dbProvider;
this.webAuthToken = webAuthToken;
this.cancellationToken = shutdownCancellationToken;
} }
public AgentMessageListener CreateAgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection) { public AgentMessageListener CreateAgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection) {
return new AgentMessageListener(connection, AgentManager, AgentJavaRuntimesManager, InstanceManager, InstanceLogManager, EventLogManager, cancellationToken); return new AgentMessageListener(connection, AgentManager, InstanceLogManager, EventLogManager, cancellationToken);
} }
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) { public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
return new WebMessageListener(connection, webAuthToken, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, AgentJavaRuntimesManager, InstanceManager, InstanceLogManager, MinecraftVersions, EventLogManager, TaskManager); return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
} }
public async Task Initialize() { public async Task Initialize() {
await DatabaseMigrator.Run(dbProvider, cancellationToken); await DatabaseMigrator.Run(dbProvider, cancellationToken);
await AgentManager.Initialize();
await PermissionManager.Initialize(); await PermissionManager.Initialize();
await RoleManager.Initialize(); await RoleManager.Initialize();
await AgentManager.Initialize(); }
await InstanceManager.Initialize();
public async ValueTask DisposeAsync() {
await actorSystem.Terminate();
actorSystem.Dispose();
} }
} }

View File

@ -0,0 +1,33 @@
using System.Collections.Immutable;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Utils.Actor.Event;
namespace Phantom.Controller.Services;
sealed class ControllerState {
private readonly ObservableState<ImmutableDictionary<Guid, Agent>> agentsByGuid = new (ImmutableDictionary<Guid, Agent>.Empty);
private readonly ObservableState<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> agentJavaRuntimesByGuid = new (ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>.Empty);
private readonly ObservableState<ImmutableDictionary<Guid, Instance>> instancesByGuid = new (ImmutableDictionary<Guid, Instance>.Empty);
public ImmutableDictionary<Guid, Agent> AgentsByGuid => agentsByGuid.State;
public ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> AgentJavaRuntimesByGuid => agentJavaRuntimesByGuid.State;
public ImmutableDictionary<Guid, Instance> InstancesByGuid => instancesByGuid.State;
public ObservableState<ImmutableDictionary<Guid, Agent>>.Receiver AgentsByGuidReceiver => agentsByGuid.ReceiverSide;
public ObservableState<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>>.Receiver AgentJavaRuntimesByGuidReceiver => agentJavaRuntimesByGuid.ReceiverSide;
public ObservableState<ImmutableDictionary<Guid, Instance>>.Receiver InstancesByGuidReceiver => instancesByGuid.ReceiverSide;
public void UpdateAgent(Agent agent) {
agentsByGuid.PublisherSide.Publish(static (agentsByGuid, agent) => agentsByGuid.SetItem(agent.Configuration.AgentGuid, agent), agent);
}
public void UpdateAgentJavaRuntimes(Guid agentGuid, ImmutableArray<TaggedJavaRuntime> runtimes) {
agentJavaRuntimesByGuid.PublisherSide.Publish(static (agentJavaRuntimesByGuid, agentGuid, runtimes) => agentJavaRuntimesByGuid.SetItem(agentGuid, runtimes), agentGuid, runtimes);
}
public void UpdateInstance(Instance instance) {
instancesByGuid.PublisherSide.Publish(static (instancesByGuid, instance) => instancesByGuid.SetItem(instance.Configuration.InstanceGuid, instance), instance);
}
}

View File

@ -0,0 +1,165 @@
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Minecraft;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Entities;
using Phantom.Controller.Database.Repositories;
using Phantom.Controller.Services.Agents;
using Phantom.Utils.Actor;
namespace Phantom.Controller.Services.Instances;
sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
public readonly record struct Init(Instance Instance, ActorRef<AgentActor.ICommand> AgentActorRef, AgentConnection AgentConnection, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new InstanceActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
private readonly AgentConnection agentConnection;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private Guid InstanceGuid => configuration.InstanceGuid;
private InstanceConfiguration configuration;
private IInstanceStatus status;
private bool launchAutomatically;
private InstanceActor(Init init) {
this.agentActorRef = init.AgentActorRef;
this.agentConnection = init.AgentConnection;
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
var instance = init.Instance;
this.configuration = instance.Configuration;
this.status = instance.Status;
this.launchAutomatically = instance.LaunchAutomatically;
Receive<SetStatusCommand>(SetStatus);
ReceiveAsyncAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance);
ReceiveAsyncAndReply<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
ReceiveAsyncAndReply<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
ReceiveAsyncAndReply<SendMinecraftCommandCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
}
private void NotifyInstanceUpdated() {
var instance = new Instance(configuration, status, launchAutomatically);
agentActorRef.Tell(new AgentActor.ReceiveInstanceDataCommand(instance));
}
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
var reply = await agentConnection.Send<TMessage, InstanceActionResult<TReply>>(message, TimeSpan.FromSeconds(10), cancellationToken);
return reply.DidNotReplyIfNull();
}
public interface ICommand {}
public sealed record SetStatusCommand(IInstanceStatus Status) : ICommand;
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsNewInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
public sealed record LaunchInstanceCommand(Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendMinecraftCommandCommand(Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
private void SetStatus(SetStatusCommand command) {
status = command.Status;
NotifyInstanceUpdated();
}
private async Task<InstanceActionResult<ConfigureInstanceResult>> ConfigureInstance(ConfigureInstanceCommand command) {
var message = new ConfigureInstanceMessage(command.Configuration, command.LaunchProperties);
var result = await SendInstanceActionMessage<ConfigureInstanceMessage, ConfigureInstanceResult>(message);
if (result.Is(ConfigureInstanceResult.Success)) {
configuration = command.Configuration;
NotifyInstanceUpdated();
await using var db = dbProvider.Lazy();
InstanceEntity entity = db.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
entity.AgentGuid = configuration.AgentGuid;
entity.InstanceName = configuration.InstanceName;
entity.ServerPort = configuration.ServerPort;
entity.RconPort = configuration.RconPort;
entity.MinecraftVersion = configuration.MinecraftVersion;
entity.MinecraftServerKind = configuration.MinecraftServerKind;
entity.MemoryAllocation = configuration.MemoryAllocation;
entity.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
if (command.IsNewInstance) {
auditLogWriter.InstanceEdited(configuration.InstanceGuid);
}
else {
auditLogWriter.InstanceCreated(configuration.InstanceGuid);
}
await db.Ctx.SaveChangesAsync(cancellationToken);
}
return result;
}
private async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
var message = new LaunchInstanceMessage(InstanceGuid);
var result = await SendInstanceActionMessage<LaunchInstanceMessage, LaunchInstanceResult>(message);
if (result.Is(LaunchInstanceResult.LaunchInitiated)) {
await HandleInstanceManuallyLaunchedOrStopped(true, command.AuditLogUserGuid, auditLogWriter => auditLogWriter.InstanceLaunched(InstanceGuid));
}
return result;
}
private async Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
var message = new StopInstanceMessage(InstanceGuid, command.StopStrategy);
var result = await SendInstanceActionMessage<StopInstanceMessage, StopInstanceResult>(message);
if (result.Is(StopInstanceResult.StopInitiated)) {
await HandleInstanceManuallyLaunchedOrStopped(false, command.AuditLogUserGuid, auditLogWriter => auditLogWriter.InstanceStopped(InstanceGuid, command.StopStrategy.Seconds));
}
return result;
}
private async Task HandleInstanceManuallyLaunchedOrStopped(bool wasLaunched, Guid auditLogUserGuid, Action<AuditLogRepository.ItemWriter> addAuditEvent) {
if (launchAutomatically != wasLaunched) {
launchAutomatically = wasLaunched;
NotifyInstanceUpdated();
}
await using var db = dbProvider.Lazy();
var entity = await db.Ctx.Instances.FindAsync(new object[] { InstanceGuid }, cancellationToken);
if (entity != null) {
entity.LaunchAutomatically = wasLaunched;
addAuditEvent(new AuditLogRepository(db).Writer(auditLogUserGuid));
await db.Ctx.SaveChangesAsync(cancellationToken);
}
}
private async Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendMinecraftCommandCommand command) {
var message = new SendCommandToInstanceMessage(InstanceGuid, command.Command);
var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(message);
if (result.Is(SendCommandToInstanceResult.Success)) {
await using var db = dbProvider.Lazy();
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
auditLogWriter.InstanceCommandExecuted(InstanceGuid, command.Command);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
return result;
}
}

View File

@ -0,0 +1,9 @@
using Phantom.Utils.Actor;
namespace Phantom.Controller.Services.Instances;
sealed class InstanceDatabaseStorageActor : ReceiveActor<InstanceDatabaseStorageActor.ICommand> {
public interface ICommand {}
public sealed record StoreInstanceCommand() : ICommand;
}

View File

@ -1,241 +0,0 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using Phantom.Common.Data;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Minecraft;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Entities;
using Phantom.Controller.Database.Repositories;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Agents;
using Phantom.Utils.Collections;
using Phantom.Utils.Events;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Instances;
sealed class InstanceManager {
private static readonly ILogger Logger = PhantomLogger.Create<InstanceManager>();
private readonly ObservableInstances instances = new (PhantomLogger.Create<InstanceManager, ObservableInstances>());
public EventSubscribers<ImmutableDictionary<Guid, Instance>> InstancesChanged => instances.Subs;
private readonly AgentManager agentManager;
private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private readonly SemaphoreSlim modifyInstancesSemaphore = new (1, 1);
public InstanceManager(AgentManager agentManager, MinecraftVersions minecraftVersions, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
this.agentManager = agentManager;
this.minecraftVersions = minecraftVersions;
this.dbProvider = dbProvider;
this.cancellationToken = cancellationToken;
}
public async Task Initialize() {
await using var ctx = dbProvider.Eager();
await foreach (var entity in ctx.Instances.AsAsyncEnumerable().WithCancellation(cancellationToken)) {
var configuration = new InstanceConfiguration(
entity.AgentGuid,
entity.InstanceGuid,
entity.InstanceName,
entity.ServerPort,
entity.RconPort,
entity.MinecraftVersion,
entity.MinecraftServerKind,
entity.MemoryAllocation,
entity.JavaRuntimeGuid,
JvmArgumentsHelper.Split(entity.JvmArguments)
);
var instance = Instance.Offline(configuration, entity.LaunchAutomatically);
instances.ByGuid[instance.Configuration.InstanceGuid] = instance;
}
}
[SuppressMessage("ReSharper", "ConvertIfStatementToConditionalTernaryExpression")]
public async Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(Guid auditLogUserGuid, InstanceConfiguration configuration) {
var agent = agentManager.GetAgent(configuration.AgentGuid);
if (agent == null) {
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.AgentNotFound);
}
if (string.IsNullOrWhiteSpace(configuration.InstanceName)) {
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty);
}
if (configuration.MemoryAllocation <= RamAllocationUnits.Zero) {
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero);
}
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(configuration.MinecraftVersion, cancellationToken);
if (serverExecutableInfo == null) {
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound);
}
InstanceActionResult<CreateOrUpdateInstanceResult> result;
bool isNewInstance;
await modifyInstancesSemaphore.WaitAsync(cancellationToken);
try {
isNewInstance = !instances.ByGuid.TryReplace(configuration.InstanceGuid, instance => instance with { Configuration = configuration });
if (isNewInstance) {
instances.ByGuid.TryAdd(configuration.InstanceGuid, Instance.Offline(configuration));
}
var message = new ConfigureInstanceMessage(configuration, new InstanceLaunchProperties(serverExecutableInfo));
var reply = await agentManager.SendMessage<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(configuration.AgentGuid, message, TimeSpan.FromSeconds(10));
result = reply.DidNotReplyIfNull().Map(static result => result switch {
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
_ => CreateOrUpdateInstanceResult.UnknownError
});
if (result.Is(CreateOrUpdateInstanceResult.Success)) {
await using var db = dbProvider.Lazy();
InstanceEntity entity = db.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
entity.AgentGuid = configuration.AgentGuid;
entity.InstanceName = configuration.InstanceName;
entity.ServerPort = configuration.ServerPort;
entity.RconPort = configuration.RconPort;
entity.MinecraftVersion = configuration.MinecraftVersion;
entity.MinecraftServerKind = configuration.MinecraftServerKind;
entity.MemoryAllocation = configuration.MemoryAllocation;
entity.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
var auditLogWriter = new AuditLogRepository(db).Writer(auditLogUserGuid);
if (isNewInstance) {
auditLogWriter.InstanceCreated(configuration.InstanceGuid);
}
else {
auditLogWriter.InstanceEdited(configuration.InstanceGuid);
}
await db.Ctx.SaveChangesAsync(cancellationToken);
}
else if (isNewInstance) {
instances.ByGuid.Remove(configuration.InstanceGuid);
}
} finally {
modifyInstancesSemaphore.Release();
}
if (result.Is(CreateOrUpdateInstanceResult.Success)) {
if (isNewInstance) {
Logger.Information("Added instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\".", configuration.InstanceName, configuration.InstanceGuid, agent.Name);
}
else {
Logger.Information("Edited instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\".", configuration.InstanceName, configuration.InstanceGuid, agent.Name);
}
}
else {
if (isNewInstance) {
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", configuration.InstanceName, configuration.InstanceGuid, agent.Name, result.ToSentence(CreateOrUpdateInstanceResultExtensions.ToSentence));
}
else {
Logger.Information("Failed editing instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\". {ErrorMessage}", configuration.InstanceName, configuration.InstanceGuid, agent.Name, result.ToSentence(CreateOrUpdateInstanceResultExtensions.ToSentence));
}
}
return result;
}
internal void SetInstanceState(Guid instanceGuid, IInstanceStatus instanceStatus) {
instances.ByGuid.TryReplace(instanceGuid, instance => instance with { Status = instanceStatus });
}
internal void SetInstanceStatesForAgent(Guid agentGuid, IInstanceStatus instanceStatus) {
instances.ByGuid.ReplaceAllIf(instance => instance with { Status = instanceStatus }, instance => instance.Configuration.AgentGuid == agentGuid);
}
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(Instance instance, TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
var reply = await agentManager.SendMessage<TMessage, InstanceActionResult<TReply>>(instance.Configuration.AgentGuid, message, TimeSpan.FromSeconds(10));
return reply.DidNotReplyIfNull();
}
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(Guid instanceGuid, TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
return instances.ByGuid.TryGetValue(instanceGuid, out var instance) ? await SendInstanceActionMessage<TMessage, TReply>(instance, message) : InstanceActionResult.General<TReply>(InstanceActionGeneralResult.InstanceDoesNotExist);
}
public async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid auditLogUserGuid, Guid instanceGuid) {
var result = await SendInstanceActionMessage<LaunchInstanceMessage, LaunchInstanceResult>(instanceGuid, new LaunchInstanceMessage(instanceGuid));
if (result.Is(LaunchInstanceResult.LaunchInitiated)) {
await HandleInstanceManuallyLaunchedOrStopped(instanceGuid, true, auditLogUserGuid, auditLogWriter => auditLogWriter.InstanceLaunched(instanceGuid));
}
return result;
}
public async Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid auditLogUserGuid, Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
var result = await SendInstanceActionMessage<StopInstanceMessage, StopInstanceResult>(instanceGuid, new StopInstanceMessage(instanceGuid, stopStrategy));
if (result.Is(StopInstanceResult.StopInitiated)) {
await HandleInstanceManuallyLaunchedOrStopped(instanceGuid, false, auditLogUserGuid, auditLogWriter => auditLogWriter.InstanceStopped(instanceGuid, stopStrategy.Seconds));
}
return result;
}
private async Task HandleInstanceManuallyLaunchedOrStopped(Guid instanceGuid, bool wasLaunched, Guid auditLogUserGuid, Action<AuditLogRepository.ItemWriter> addAuditEvent) {
await modifyInstancesSemaphore.WaitAsync(cancellationToken);
try {
instances.ByGuid.TryReplace(instanceGuid, instance => instance with { LaunchAutomatically = wasLaunched });
await using var db = dbProvider.Lazy();
var entity = await db.Ctx.Instances.FindAsync(new object[] { instanceGuid }, cancellationToken);
if (entity != null) {
entity.LaunchAutomatically = wasLaunched;
addAuditEvent(new AuditLogRepository(db).Writer(auditLogUserGuid));
await db.Ctx.SaveChangesAsync(cancellationToken);
}
} finally {
modifyInstancesSemaphore.Release();
}
}
public async Task<InstanceActionResult<SendCommandToInstanceResult>> SendCommand(Guid auditLogUserId, Guid instanceGuid, string command) {
var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(instanceGuid, new SendCommandToInstanceMessage(instanceGuid, command));
if (result.Is(SendCommandToInstanceResult.Success)) {
await using var db = dbProvider.Lazy();
var auditLogWriter = new AuditLogRepository(db).Writer(auditLogUserId);
auditLogWriter.InstanceCommandExecuted(instanceGuid, command);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
return result;
}
internal async Task<ImmutableArray<ConfigureInstanceMessage>> GetInstanceConfigurationsForAgent(Guid agentGuid) {
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
foreach (var (configuration, _, launchAutomatically) in instances.ByGuid.ValuesCopy.Where(instance => instance.Configuration.AgentGuid == agentGuid)) {
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(configuration.MinecraftVersion, cancellationToken);
configurationMessages.Add(new ConfigureInstanceMessage(configuration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
}
return configurationMessages.ToImmutable();
}
private sealed class ObservableInstances : ObservableState<ImmutableDictionary<Guid, Instance>> {
public RwLockedObservableDictionary<Guid, Instance> ByGuid { get; } = new (LockRecursionPolicy.NoRecursion);
public ObservableInstances(ILogger logger) : base(logger) {
ByGuid.CollectionChanged += Update;
}
protected override ImmutableDictionary<Guid, Instance> GetData() {
return ByGuid.ToImmutable();
}
}
}

View File

@ -6,6 +6,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Akka" />
<PackageReference Include="BCrypt.Net-Next.StrongName" /> <PackageReference Include="BCrypt.Net-Next.StrongName" />
</ItemGroup> </ItemGroup>
@ -14,6 +15,7 @@
<ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Data\Phantom.Common.Data.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Messages.Agent\Phantom.Common.Messages.Agent.csproj" />
<ProjectReference Include="..\..\Common\Phantom.Common.Messages.Web\Phantom.Common.Messages.Web.csproj" /> <ProjectReference Include="..\..\Common\Phantom.Common.Messages.Web\Phantom.Common.Messages.Web.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
<ProjectReference Include="..\..\Utils\Phantom.Utils.Events\Phantom.Utils.Events.csproj" /> <ProjectReference Include="..\..\Utils\Phantom.Utils.Events\Phantom.Utils.Events.csproj" />
<ProjectReference Include="..\Phantom.Controller.Database\Phantom.Controller.Database.csproj" /> <ProjectReference Include="..\Phantom.Controller.Database\Phantom.Controller.Database.csproj" />
<ProjectReference Include="..\Phantom.Controller.Minecraft\Phantom.Controller.Minecraft.csproj" /> <ProjectReference Include="..\Phantom.Controller.Minecraft\Phantom.Controller.Minecraft.csproj" />

View File

@ -1,5 +1,4 @@
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Replies;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.BiDirectional; using Phantom.Common.Messages.Agent.BiDirectional;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
@ -16,32 +15,28 @@ namespace Phantom.Controller.Services.Rpc;
public sealed class AgentMessageListener : IMessageToControllerListener { public sealed class AgentMessageListener : IMessageToControllerListener {
private readonly RpcConnectionToClient<IMessageToAgentListener> connection; private readonly RpcConnectionToClient<IMessageToAgentListener> connection;
private readonly AgentManager agentManager; private readonly AgentManager agentManager;
private readonly AgentJavaRuntimesManager agentJavaRuntimesManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager; private readonly InstanceLogManager instanceLogManager;
private readonly EventLogManager eventLogManager; private readonly EventLogManager eventLogManager;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
private readonly TaskCompletionSource<Guid> agentGuidWaiter = AsyncTasks.CreateCompletionSource<Guid>(); private readonly TaskCompletionSource<Guid> agentGuidWaiter = AsyncTasks.CreateCompletionSource<Guid>();
internal AgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection, AgentManager agentManager, AgentJavaRuntimesManager agentJavaRuntimesManager, InstanceManager instanceManager, InstanceLogManager instanceLogManager, EventLogManager eventLogManager, CancellationToken cancellationToken) { internal AgentMessageListener(RpcConnectionToClient<IMessageToAgentListener> connection, AgentManager agentManager, InstanceLogManager instanceLogManager, EventLogManager eventLogManager, CancellationToken cancellationToken) {
this.connection = connection; this.connection = connection;
this.agentManager = agentManager; this.agentManager = agentManager;
this.agentJavaRuntimesManager = agentJavaRuntimesManager;
this.instanceManager = instanceManager;
this.instanceLogManager = instanceLogManager; this.instanceLogManager = instanceLogManager;
this.eventLogManager = eventLogManager; this.eventLogManager = eventLogManager;
this.cancellationToken = cancellationToken; this.cancellationToken = cancellationToken;
} }
public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) { public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) {
if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.Guid) { if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.AgentGuid) {
connection.SetAuthorizationResult(false); connection.SetAuthorizationResult(false);
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent)); await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
} }
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, instanceManager, connection)) { else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, connection)) {
connection.SetAuthorizationResult(true); connection.SetAuthorizationResult(true);
agentGuidWaiter.SetResult(message.AgentInfo.Guid); agentGuidWaiter.SetResult(message.AgentInfo.AgentGuid);
} }
return NoReply.Instance; return NoReply.Instance;
@ -53,10 +48,7 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) { public Task<NoReply> HandleUnregisterAgent(UnregisterAgentMessage message) {
if (agentGuidWaiter.Task.IsCompleted) { if (agentGuidWaiter.Task.IsCompleted) {
var agentGuid = agentGuidWaiter.Task.Result; agentManager.TellAgent(agentGuidWaiter.Task.Result, new AgentActor.UnregisterCommand(connection));
if (agentManager.UnregisterAgent(agentGuid, connection)) {
instanceManager.SetInstanceStatesForAgent(agentGuid, InstanceStatus.Offline);
}
} }
connection.Close(); connection.Close();
@ -64,23 +56,23 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
} }
public async Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message) { public async Task<NoReply> HandleAgentIsAlive(AgentIsAliveMessage message) {
agentManager.NotifyAgentIsAlive(await WaitForAgentGuid()); agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.NotifyIsAliveCommand());
return NoReply.Instance; return NoReply.Instance;
} }
public async Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) { public async Task<NoReply> HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
agentJavaRuntimesManager.Update(await WaitForAgentGuid(), message.Runtimes); agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateJavaRuntimesCommand(message.Runtimes));
return NoReply.Instance; return NoReply.Instance;
} }
public async Task<NoReply> HandleReportAgentStatus(ReportAgentStatusMessage message) { public async Task<NoReply> HandleReportAgentStatus(ReportAgentStatusMessage message) {
agentManager.SetAgentStats(await WaitForAgentGuid(), message.RunningInstanceCount, message.RunningInstanceMemory); agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateStatsCommand(message.RunningInstanceCount, message.RunningInstanceMemory));
return NoReply.Instance; return NoReply.Instance;
} }
public Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message) { public async Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
instanceManager.SetInstanceState(message.InstanceGuid, message.InstanceStatus); agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateInstanceStatusCommand(message.InstanceGuid, message.InstanceStatus));
return Task.FromResult(NoReply.Instance); return NoReply.Instance;
} }
public async Task<NoReply> HandleReportInstanceEvent(ReportInstanceEventMessage message) { public async Task<NoReply> HandleReportInstanceEvent(ReportInstanceEventMessage message) {

View File

@ -1,9 +1,9 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using Akka.Actor;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Java; using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft; using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.AuditLog; using Phantom.Common.Data.Web.AuditLog;
using Phantom.Common.Data.Web.EventLog; using Phantom.Common.Data.Web.EventLog;
using Phantom.Common.Data.Web.Instance; using Phantom.Common.Data.Web.Instance;
@ -17,93 +17,118 @@ using Phantom.Controller.Services.Agents;
using Phantom.Controller.Services.Events; using Phantom.Controller.Services.Events;
using Phantom.Controller.Services.Instances; using Phantom.Controller.Services.Instances;
using Phantom.Controller.Services.Users; using Phantom.Controller.Services.Users;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Serilog; using Serilog;
using Agent = Phantom.Common.Data.Web.Agent.Agent;
namespace Phantom.Controller.Services.Rpc; namespace Phantom.Controller.Services.Rpc;
public sealed class WebMessageListener : IMessageToControllerListener { public sealed class WebMessageListener : IMessageToControllerListener {
private static readonly ILogger Logger = PhantomLogger.Create<WebMessageListener>(); private static readonly ILogger Logger = PhantomLogger.Create<WebMessageListener>();
private static int listenerSequenceId = 0;
private readonly ActorRef<ICommand> actor;
private readonly RpcConnectionToClient<IMessageToWebListener> connection; private readonly RpcConnectionToClient<IMessageToWebListener> connection;
private readonly AuthToken authToken; private readonly AuthToken authToken;
private readonly ControllerState controllerState;
private readonly UserManager userManager; private readonly UserManager userManager;
private readonly RoleManager roleManager; private readonly RoleManager roleManager;
private readonly UserRoleManager userRoleManager; private readonly UserRoleManager userRoleManager;
private readonly UserLoginManager userLoginManager; private readonly UserLoginManager userLoginManager;
private readonly AuditLogManager auditLogManager; private readonly AuditLogManager auditLogManager;
private readonly AgentManager agentManager; private readonly AgentManager agentManager;
private readonly AgentJavaRuntimesManager agentJavaRuntimesManager;
private readonly InstanceManager instanceManager;
private readonly InstanceLogManager instanceLogManager; private readonly InstanceLogManager instanceLogManager;
private readonly MinecraftVersions minecraftVersions; private readonly MinecraftVersions minecraftVersions;
private readonly EventLogManager eventLogManager; private readonly EventLogManager eventLogManager;
private readonly TaskManager taskManager;
internal WebMessageListener( internal WebMessageListener(
IActorRefFactory actorSystem,
RpcConnectionToClient<IMessageToWebListener> connection, RpcConnectionToClient<IMessageToWebListener> connection,
AuthToken authToken, AuthToken authToken,
ControllerState controllerState,
UserManager userManager, UserManager userManager,
RoleManager roleManager, RoleManager roleManager,
UserRoleManager userRoleManager, UserRoleManager userRoleManager,
UserLoginManager userLoginManager, UserLoginManager userLoginManager,
AuditLogManager auditLogManager, AuditLogManager auditLogManager,
AgentManager agentManager, AgentManager agentManager,
AgentJavaRuntimesManager agentJavaRuntimesManager,
InstanceManager instanceManager,
InstanceLogManager instanceLogManager, InstanceLogManager instanceLogManager,
MinecraftVersions minecraftVersions, MinecraftVersions minecraftVersions,
EventLogManager eventLogManager, EventLogManager eventLogManager
TaskManager taskManager
) { ) {
this.actor = actorSystem.ActorOf(Actor.Factory(this), "Web-" + Interlocked.Increment(ref listenerSequenceId));
this.connection = connection; this.connection = connection;
this.authToken = authToken; this.authToken = authToken;
this.controllerState = controllerState;
this.userManager = userManager; this.userManager = userManager;
this.roleManager = roleManager; this.roleManager = roleManager;
this.userRoleManager = userRoleManager; this.userRoleManager = userRoleManager;
this.userLoginManager = userLoginManager; this.userLoginManager = userLoginManager;
this.auditLogManager = auditLogManager; this.auditLogManager = auditLogManager;
this.agentManager = agentManager; this.agentManager = agentManager;
this.agentJavaRuntimesManager = agentJavaRuntimesManager;
this.instanceManager = instanceManager;
this.instanceLogManager = instanceLogManager; this.instanceLogManager = instanceLogManager;
this.minecraftVersions = minecraftVersions; this.minecraftVersions = minecraftVersions;
this.eventLogManager = eventLogManager; this.eventLogManager = eventLogManager;
this.taskManager = taskManager;
} }
private void OnConnectionReady() { private sealed class Actor : ReceiveActor<ICommand> {
lock (this) { public static Props<ICommand> Factory(WebMessageListener listener) {
agentManager.AgentsChanged.Subscribe(this, HandleAgentsChanged); return Props<ICommand>.Create(() => new Actor(listener), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
instanceManager.InstancesChanged.Subscribe(this, HandleInstancesChanged); }
instanceLogManager.LogsReceived += HandleInstanceLogsReceived;
private readonly WebMessageListener listener;
private Actor(WebMessageListener listener) {
this.listener = listener;
Receive<StartConnectionCommand>(StartConnection);
Receive<StopConnectionCommand>(StopConnection);
Receive<RefreshAgentsCommand>(RefreshAgents);
Receive<RefreshInstancesCommand>(RefreshInstances);
}
private void StartConnection(StartConnectionCommand command) {
listener.controllerState.AgentsByGuidReceiver.Register(SelfTyped, static state => new RefreshAgentsCommand(state));
listener.controllerState.InstancesByGuidReceiver.Register(SelfTyped, static state => new RefreshInstancesCommand(state));
listener.instanceLogManager.LogsReceived += HandleInstanceLogsReceived;
}
private void StopConnection(StopConnectionCommand command) {
listener.instanceLogManager.LogsReceived -= HandleInstanceLogsReceived;
listener.controllerState.AgentsByGuidReceiver.Unregister(SelfTyped);
listener.controllerState.InstancesByGuidReceiver.Unregister(SelfTyped);
}
private void RefreshAgents(RefreshAgentsCommand command) {
var message = new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray());
listener.connection.Send(message);
}
private void RefreshInstances(RefreshInstancesCommand command) {
var message = new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray());
listener.connection.Send(message);
}
private void HandleInstanceLogsReceived(object? sender, InstanceLogManager.Event e) {
listener.connection.Send(new InstanceOutputMessage(e.InstanceGuid, e.Lines));
} }
} }
private void OnConnectionClosed() { private interface ICommand {}
lock (this) {
agentManager.AgentsChanged.Unsubscribe(this);
instanceManager.InstancesChanged.Unsubscribe(this);
instanceLogManager.LogsReceived -= HandleInstanceLogsReceived;
}
}
private void HandleAgentsChanged(ImmutableArray<Agent> agents) { private sealed record StartConnectionCommand : ICommand;
var message = new RefreshAgentsMessage(agents.Select(static agent => new AgentWithStats(agent.Guid, agent.Name, agent.ProtocolVersion, agent.BuildVersion, agent.MaxInstances, agent.MaxMemory, agent.AllowedServerPorts, agent.AllowedRconPorts, agent.Stats, agent.LastPing, agent.IsOnline)).ToImmutableArray());
taskManager.Run("Send agents to web", () => connection.Send(message));
}
private void HandleInstancesChanged(ImmutableDictionary<Guid, Instance> instances) { private sealed record StopConnectionCommand : ICommand;
var message = new RefreshInstancesMessage(instances.Values.ToImmutableArray());
taskManager.Run("Send instances to web", () => connection.Send(message));
}
private void HandleInstanceLogsReceived(object? sender, InstanceLogManager.Event e) { private sealed record RefreshAgentsCommand(ImmutableDictionary<Guid, Agent> Agents) : ICommand;
taskManager.Run("Send instance logs to web", () => connection.Send(new InstanceOutputMessage(e.InstanceGuid, e.Lines)));
} private sealed record RefreshInstancesCommand(ImmutableDictionary<Guid, Instance> Instances) : ICommand;
public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) { public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) {
if (authToken.FixedTimeEquals(message.AuthToken)) { if (authToken.FixedTimeEquals(message.AuthToken)) {
@ -118,7 +143,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
} }
if (!connection.IsClosed) { if (!connection.IsClosed) {
OnConnectionReady(); actor.Tell(new StartConnectionCommand());
} }
return NoReply.Instance; return NoReply.Instance;
@ -127,7 +152,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
public Task<NoReply> HandleUnregisterWeb(UnregisterWebMessage message) { public Task<NoReply> HandleUnregisterWeb(UnregisterWebMessage message) {
if (!connection.IsClosed) { if (!connection.IsClosed) {
connection.Close(); connection.Close();
OnConnectionClosed(); actor.Tell(new StopConnectionCommand());
} }
return Task.FromResult(NoReply.Instance); return Task.FromResult(NoReply.Instance);
@ -162,19 +187,19 @@ public sealed class WebMessageListener : IMessageToControllerListener {
} }
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message) { public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message) {
return instanceManager.CreateOrUpdateInstance(message.LoggedInUserGuid, message.Configuration); return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.Configuration));
} }
public Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) { public Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
return instanceManager.LaunchInstance(message.LoggedInUserGuid, message.InstanceGuid); return agentManager.DoInstanceAction<AgentActor.LaunchInstanceCommand, LaunchInstanceResult>(message.AgentGuid, new AgentActor.LaunchInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid));
} }
public Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) { public Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
return instanceManager.StopInstance(message.LoggedInUserGuid, message.InstanceGuid, message.StopStrategy); return agentManager.DoInstanceAction<AgentActor.StopInstanceCommand, StopInstanceResult>(message.AgentGuid, new AgentActor.StopInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.StopStrategy));
} }
public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) { public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return instanceManager.SendCommand(message.LoggedInUserGuid, message.InstanceGuid, message.Command); return agentManager.DoInstanceAction<AgentActor.SendMinecraftCommandCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendMinecraftCommandCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
} }
public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) { public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {
@ -182,7 +207,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
} }
public Task<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> HandleGetAgentJavaRuntimes(GetAgentJavaRuntimesMessage message) { public Task<ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>> HandleGetAgentJavaRuntimes(GetAgentJavaRuntimesMessage message) {
return Task.FromResult(agentJavaRuntimesManager.All); return Task.FromResult(controllerState.AgentJavaRuntimesByGuid);
} }
public Task<ImmutableArray<AuditLogItem>> HandleGetAuditLog(GetAuditLogMessage message) { public Task<ImmutableArray<AuditLogItem>> HandleGetAuditLog(GetAuditLogMessage message) {

View File

@ -51,26 +51,27 @@ try {
return 1; return 1;
} }
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server..."); PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
await controllerServices.Initialize(); var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) { await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) {
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate); await controllerServices.Initialize();
}
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc")); static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
try { return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
await Task.WhenAll( }
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken) var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
); try {
} finally { await Task.WhenAll(
await rpcTaskManager.Stop(); RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
NetMQConfig.Cleanup(); RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
);
} finally {
await rpcTaskManager.Stop();
NetMQConfig.Cleanup();
}
} }
return 0; return 0;

View File

@ -1,12 +1,15 @@
<Project> <Project>
<ItemGroup> <ItemGroup>
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="8.0.0" /> <PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="8.0.0" />
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="8.0.0" /> <PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="8.0.0" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Relational" Version="8.0.0" /> </ItemGroup>
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="8.0.0" />
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.0" /> <ItemGroup>
<PackageReference Update="System.Linq.Async" Version="6.0.1" /> <PackageReference Update="Microsoft.EntityFrameworkCore.Relational" Version="8.0.0" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="8.0.0" />
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.0" />
<PackageReference Update="System.Linq.Async" Version="6.0.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
@ -14,12 +17,13 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Update="BCrypt.Net-Next.StrongName" Version="4.0.3" /> <PackageReference Update="Akka" Version="1.5.17.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Update="MemoryPack" Version="1.10.0" /> <PackageReference Update="BCrypt.Net-Next.StrongName" Version="4.0.3" />
<PackageReference Update="NetMQ" Version="4.0.1.13" /> <PackageReference Update="MemoryPack" Version="1.10.0" />
<PackageReference Update="NetMQ" Version="4.0.1.13" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -44,6 +44,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Controller.Services
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils", "Utils\Phantom.Utils\Phantom.Utils.csproj", "{384885E2-5113-45C5-9B15-09BDA0911852}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils", "Utils\Phantom.Utils\Phantom.Utils.csproj", "{384885E2-5113-45C5-9B15-09BDA0911852}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Actor", "Utils\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj", "{BBFF32C1-A98A-44BF-9023-04344BBB896B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Events", "Utils\Phantom.Utils.Events\Phantom.Utils.Events.csproj", "{2E81523B-5DBE-4992-A77B-1679758D0688}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Events", "Utils\Phantom.Utils.Events\Phantom.Utils.Events.csproj", "{2E81523B-5DBE-4992-A77B-1679758D0688}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Logging", "Utils\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj", "{FCA141F5-4F18-47C2-9855-14E326FF1219}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Phantom.Utils.Logging", "Utils\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj", "{FCA141F5-4F18-47C2-9855-14E326FF1219}"
@ -126,6 +128,10 @@ Global
{384885E2-5113-45C5-9B15-09BDA0911852}.Debug|Any CPU.Build.0 = Debug|Any CPU {384885E2-5113-45C5-9B15-09BDA0911852}.Debug|Any CPU.Build.0 = Debug|Any CPU
{384885E2-5113-45C5-9B15-09BDA0911852}.Release|Any CPU.ActiveCfg = Release|Any CPU {384885E2-5113-45C5-9B15-09BDA0911852}.Release|Any CPU.ActiveCfg = Release|Any CPU
{384885E2-5113-45C5-9B15-09BDA0911852}.Release|Any CPU.Build.0 = Release|Any CPU {384885E2-5113-45C5-9B15-09BDA0911852}.Release|Any CPU.Build.0 = Release|Any CPU
{BBFF32C1-A98A-44BF-9023-04344BBB896B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BBFF32C1-A98A-44BF-9023-04344BBB896B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BBFF32C1-A98A-44BF-9023-04344BBB896B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BBFF32C1-A98A-44BF-9023-04344BBB896B}.Release|Any CPU.Build.0 = Release|Any CPU
{2E81523B-5DBE-4992-A77B-1679758D0688}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {2E81523B-5DBE-4992-A77B-1679758D0688}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2E81523B-5DBE-4992-A77B-1679758D0688}.Debug|Any CPU.Build.0 = Debug|Any CPU {2E81523B-5DBE-4992-A77B-1679758D0688}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2E81523B-5DBE-4992-A77B-1679758D0688}.Release|Any CPU.ActiveCfg = Release|Any CPU {2E81523B-5DBE-4992-A77B-1679758D0688}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -171,6 +177,7 @@ Global
{4B3B73E6-48DD-4846-87FD-DFB86619B67C} = {0AB9471E-6228-4EB7-802E-3102B3952AAD} {4B3B73E6-48DD-4846-87FD-DFB86619B67C} = {0AB9471E-6228-4EB7-802E-3102B3952AAD}
{90F0F1B1-EB0A-49C9-8DF0-1153A87F77C9} = {0AB9471E-6228-4EB7-802E-3102B3952AAD} {90F0F1B1-EB0A-49C9-8DF0-1153A87F77C9} = {0AB9471E-6228-4EB7-802E-3102B3952AAD}
{384885E2-5113-45C5-9B15-09BDA0911852} = {AA217EB8-E480-456B-BDF3-39419EF2AD85} {384885E2-5113-45C5-9B15-09BDA0911852} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{BBFF32C1-A98A-44BF-9023-04344BBB896B} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{2E81523B-5DBE-4992-A77B-1679758D0688} = {AA217EB8-E480-456B-BDF3-39419EF2AD85} {2E81523B-5DBE-4992-A77B-1679758D0688} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{FCA141F5-4F18-47C2-9855-14E326FF1219} = {AA217EB8-E480-456B-BDF3-39419EF2AD85} {FCA141F5-4F18-47C2-9855-14E326FF1219} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}
{BB112660-7A20-45E6-9195-65363B74027F} = {AA217EB8-E480-456B-BDF3-39419EF2AD85} {BB112660-7A20-45E6-9195-65363B74027F} = {AA217EB8-E480-456B-BDF3-39419EF2AD85}

View File

@ -0,0 +1,20 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
public readonly struct ActorConfiguration {
public SupervisorStrategy? SupervisorStrategy { get; init; }
public string? MailboxType { get; init; }
internal Props Apply(Props props) {
if (SupervisorStrategy != null) {
props = props.WithSupervisorStrategy(SupervisorStrategy);
}
if (MailboxType != null) {
props = props.WithMailbox(MailboxType);
}
return props;
}
}

View File

@ -0,0 +1,9 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
public static class ActorExtensions {
public static ActorRef<TMessage> ActorOf<TMessage>(this IActorRefFactory factory, Props<TMessage> props, string? name) {
return new ActorRef<TMessage>(factory.ActorOf(props.Inner, name));
}
}

View File

@ -0,0 +1,19 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
sealed class ActorFactory<TActor> : IIndirectActorProducer where TActor : ActorBase {
public Type ActorType => typeof(TActor);
private readonly Func<TActor> constructor;
public ActorFactory(Func<TActor> constructor) {
this.constructor = constructor;
}
public ActorBase Produce() {
return constructor();
}
public void Release(ActorBase actor) {}
}

View File

@ -0,0 +1,31 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
public readonly struct ActorRef<TMessage> {
private readonly IActorRef actorRef;
internal ActorRef(IActorRef actorRef) {
this.actorRef = actorRef;
}
internal bool IsSame<TOtherMessage>(ActorRef<TOtherMessage> other) {
return actorRef.Equals(other.actorRef);
}
public void Tell(TMessage message) {
actorRef.Tell(message);
}
public void Forward(TMessage message) {
actorRef.Forward(message);
}
public Task<TReply> Request<TReply>(ICanReply<TReply> message, TimeSpan? timeout, CancellationToken cancellationToken = default) {
return actorRef.Ask<TReply>(message, timeout, cancellationToken);
}
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
return Request(message, timeout: null, cancellationToken);
}
}

View File

@ -0,0 +1,31 @@
using Akka.Actor;
using Akka.Configuration;
namespace Phantom.Utils.Actor;
public static class ActorSystemFactory {
private const string Configuration =
"""
akka {
actor {
default-dispatcher = {
executor = task-executor
}
internal-dispatcher = akka.actor.default-dispatcher
debug.unhandled = on
}
loggers = [
"Phantom.Utils.Actor.Logging.SerilogLogger, Phantom.Utils.Actor"
]
}
unbounded-jump-ahead-mailbox {
mailbox-type : "Phantom.Utils.Actor.Mailbox.UnboundedJumpAheadMailbox, Phantom.Utils.Actor"
}
""";
private static readonly BootstrapSetup Setup = BootstrapSetup.Create().WithConfig(ConfigurationFactory.ParseString(Configuration));
public static ActorSystem Create(string name) {
return ActorSystem.Create(name, Setup);
}
}

View File

@ -0,0 +1,113 @@
namespace Phantom.Utils.Actor.Event;
public sealed class ObservableState<TState> {
private readonly ReaderWriterLockSlim rwLock = new (LockRecursionPolicy.NoRecursion);
private readonly List<IListener> listeners = new ();
private TState state;
public TState State {
get {
rwLock.EnterReadLock();
try {
return state;
} finally {
rwLock.ExitReadLock();
}
}
}
public Publisher PublisherSide { get; }
public Receiver ReceiverSide { get; }
public ObservableState(TState state) {
this.state = state;
this.PublisherSide = new Publisher(this);
this.ReceiverSide = new Receiver(this);
}
private interface IListener {
bool IsFor<TMessage>(ActorRef<TMessage> other);
void Notify(TState state);
}
private readonly record struct Listener<TMessage>(ActorRef<TMessage> Actor, Func<TState, TMessage> MessageFactory) : IListener {
public bool IsFor<TOtherMessage>(ActorRef<TOtherMessage> other) {
return Actor.IsSame(other);
}
public void Notify(TState state) {
Actor.Tell(MessageFactory(state));
}
}
public readonly struct Publisher {
private readonly ObservableState<TState> owner;
internal Publisher(ObservableState<TState> owner) {
this.owner = owner;
}
public void Publish(TState state) {
Publish(static (_, newState) => newState, state);
}
public void Publish<TArg>(Func<TState, TArg, TState> stateUpdater, TArg userObject) {
owner.rwLock.EnterWriteLock();
try {
SetInternalState(stateUpdater(owner.state, userObject));
} finally {
owner.rwLock.ExitWriteLock();
}
}
public void Publish<TArg1, TArg2>(Func<TState, TArg1, TArg2, TState> stateUpdater, TArg1 userObject1, TArg2 userObject2) {
owner.rwLock.EnterWriteLock();
try {
SetInternalState(stateUpdater(owner.state, userObject1, userObject2));
} finally {
owner.rwLock.ExitWriteLock();
}
}
private void SetInternalState(TState state) {
owner.state = state;
foreach (var listener in owner.listeners) {
listener.Notify(state);
}
}
}
public readonly struct Receiver {
private readonly ObservableState<TState> owner;
internal Receiver(ObservableState<TState> owner) {
this.owner = owner;
}
public void Register<TMessage>(ActorRef<TMessage> actor, Func<TState, TMessage> messageFactory) {
var listener = new Listener<TMessage>(actor, messageFactory);
owner.rwLock.EnterReadLock();
try {
owner.listeners.Add(listener);
listener.Notify(owner.state);
} finally {
owner.rwLock.ExitReadLock();
}
}
public void Unregister<TMessage>(ActorRef<TMessage> actor) {
owner.rwLock.EnterWriteLock();
try {
int index = owner.listeners.FindIndex(listener => listener.IsFor(actor));
if (index != -1) {
owner.listeners.RemoveAt(index);
}
} finally {
owner.rwLock.ExitWriteLock();
}
}
}
}

View File

@ -0,0 +1,5 @@
using System.Diagnostics.CodeAnalysis;
namespace Phantom.Utils.Actor;
public interface ICanReply<[SuppressMessage("ReSharper", "UnusedTypeParameter")] TReply> {}

View File

@ -0,0 +1,68 @@
using System.Diagnostics.CodeAnalysis;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Event;
using Phantom.Utils.Logging;
using Serilog;
using Serilog.Core.Enrichers;
using Serilog.Events;
using LogEvent = Akka.Event.LogEvent;
namespace Phantom.Utils.Actor.Logging;
[SuppressMessage("ReSharper", "UnusedType.Global")]
public sealed class SerilogLogger : ReceiveActor, IRequiresMessageQueue<ILoggerMessageQueueSemantics> {
private readonly Dictionary<string, ILogger> loggersBySource = new ();
public SerilogLogger() {
Receive<InitializeLogger>(Initialize);
Receive<Debug>(LogDebug);
Receive<Info>(LogInfo);
Receive<Warning>(LogWarning);
Receive<Error>(LogError);
}
private void Initialize(InitializeLogger message) {
Sender.Tell(new LoggerInitialized());
}
private void LogDebug(Debug item) {
Log(item, LogEventLevel.Debug);
}
private void LogInfo(Info item) {
Log(item, LogEventLevel.Information);
}
private void LogWarning(Warning item) {
Log(item, LogEventLevel.Warning);
}
private void LogError(Error item) {
Log(item, LogEventLevel.Error);
}
private void Log(LogEvent item, LogEventLevel level) {
GetLogger(item).Write(level, item.Cause, GetFormat(item), GetArgs(item));
}
private ILogger GetLogger(LogEvent item) {
var source = item.LogSource;
if (!loggersBySource.TryGetValue(source, out var logger)) {
var loggerName = source[(source.IndexOf(':') + 1)..];
loggersBySource[source] = logger = PhantomLogger.Create("Akka", loggerName);
}
return logger;
}
private static string GetFormat(LogEvent item) {
return item.Message is LogMessage logMessage ? logMessage.Format : "{Message:l}";
}
private static object[] GetArgs(LogEvent item) {
return item.Message is LogMessage logMessage ? logMessage.Parameters().Where(static a => a is not PropertyEnricher).ToArray() : new[] { item.Message };
}
}

View File

@ -0,0 +1,6 @@
namespace Phantom.Utils.Actor.Mailbox;
/// <summary>
/// Marker interface for messages that jump ahead in the <see cref="UnboundedJumpAheadMailbox"/>.
/// </summary>
public interface IJumpAhead {}

View File

@ -0,0 +1,16 @@
using Akka.Actor;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Dispatch.MessageQueues;
namespace Phantom.Utils.Actor.Mailbox;
public sealed class UnboundedJumpAheadMailbox : MailboxType, IProducesMessageQueue<UnboundedJumpAheadMessageQueue> {
public const string Name = "unbounded-jump-ahead-mailbox";
public UnboundedJumpAheadMailbox(Settings settings, Config config) : base(settings, config) {}
public override IMessageQueue Create(IActorRef owner, ActorSystem system) {
return new UnboundedJumpAheadMessageQueue();
}
}

View File

@ -0,0 +1,24 @@
using Akka.Actor;
using Akka.Dispatch.MessageQueues;
namespace Phantom.Utils.Actor.Mailbox;
sealed class UnboundedJumpAheadMessageQueue : BlockingMessageQueue {
private readonly Queue<Envelope> highPriorityQueue = new ();
private readonly Queue<Envelope> lowPriorityQueue = new ();
protected override int LockedCount => highPriorityQueue.Count + lowPriorityQueue.Count;
protected override void LockedEnqueue(Envelope envelope) {
if (envelope.Message is IJumpAhead) {
highPriorityQueue.Enqueue(envelope);
}
else {
lowPriorityQueue.Enqueue(envelope);
}
}
protected override bool LockedTryDequeue(out Envelope envelope) {
return highPriorityQueue.TryDequeue(out envelope) || lowPriorityQueue.TryDequeue(out envelope);
}
}

View File

@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,19 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
public sealed class Props<TMessage> {
internal Props Inner { get; }
private Props(Props inner) {
Inner = inner;
}
private static Props CreateInner<TActor>(Func<TActor> factory) where TActor : ReceiveActor<TMessage> {
return Props.CreateBy(new ActorFactory<TActor>(factory));
}
public static Props<TMessage> Create<TActor>(Func<TActor> factory, ActorConfiguration configuration) where TActor : ReceiveActor<TMessage> {
return new Props<TMessage>(configuration.Apply(CreateInner(factory)));
}
}

View File

@ -0,0 +1,51 @@
using Akka.Actor;
namespace Phantom.Utils.Actor;
public abstract class ReceiveActor<TMessage> : ReceiveActor {
protected ActorRef<TMessage> SelfTyped => new (Self);
protected void ReceiveAndReply<TReplyableCommand, TReply>(Func<TReplyableCommand, TReply> action) where TReplyableCommand : TMessage, ICanReply<TReply> {
Receive<TReplyableCommand>(message => HandleMessageWithReply(action, message));
}
protected void ReceiveAsyncAndReply<TReplyableCommand, TReply>(Func<TReplyableCommand, Task<TReply>> action) where TReplyableCommand : TMessage, ICanReply<TReply> {
ReceiveAsync<TReplyableCommand>(message => HandleMessageWithReplyAsync(action, message));
}
protected void ReceiveAsyncAndReplyLater<TReplyableCommand, TReply>(Func<TReplyableCommand, TaskCompletionSource<TReply>, Task> action) where TReplyableCommand : TMessage, ICanReply<TReply> {
ReceiveAsync<TReplyableCommand>(message => HandleMessageWithReplyAsync(action, message));
}
private void HandleMessageWithReply<TReplyableCommand, TReply>(Func<TReplyableCommand, TReply> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
try {
Sender.Tell(action(message), Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
}
private async Task HandleMessageWithReplyAsync<TReplyableCommand, TReply>(Func<TReplyableCommand, Task<TReply>> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
try {
Sender.Tell(await action(message), Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
}
private async Task HandleMessageWithReplyAsync<TReplyableCommand, TReply>(Func<TReplyableCommand, TaskCompletionSource<TReply>, Task> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
var taskCompletionSource = new TaskCompletionSource<TReply>();
var sender = Sender;
taskCompletionSource.Task.ContinueWith(task => ((ICanTell) sender).Tell())
try {
Sender.Tell(await action(message), Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
}
protected void CompleteWith<TReply>(TaskCompletionSource<TReply> taskCompletionSource, Func<Task<TReply>> task) {
}
}

View File

@ -0,0 +1,10 @@
using Akka.Actor;
using Akka.Util.Internal;
namespace Phantom.Utils.Actor;
public static class SupervisorStrategies {
private static DeployableDecider DefaultDecider { get; } = SupervisorStrategy.DefaultDecider.AsInstanceOf<DeployableDecider>();
public static SupervisorStrategy Resume { get; } = new OneForOneStrategy(Decider.From(Directive.Resume, DefaultDecider.Pairs));
}

View File

@ -1,102 +0,0 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
namespace Phantom.Utils.Collections;
public sealed class RwLockedObservableDictionary<TKey, TValue> where TKey : notnull {
public event EventHandler? CollectionChanged;
private readonly RwLockedDictionary<TKey, TValue> dict;
public RwLockedObservableDictionary(LockRecursionPolicy recursionPolicy) {
this.dict = new RwLockedDictionary<TKey, TValue>(recursionPolicy);
}
public RwLockedObservableDictionary(int capacity, LockRecursionPolicy recursionPolicy) {
this.dict = new RwLockedDictionary<TKey, TValue>(capacity, recursionPolicy);
}
private void FireCollectionChanged() {
CollectionChanged?.Invoke(this, EventArgs.Empty);
}
private bool FireCollectionChangedIf(bool result) {
if (result) {
FireCollectionChanged();
return true;
}
else {
return false;
}
}
public TValue this[TKey key] {
get => dict[key];
set {
dict[key] = value;
FireCollectionChanged();
}
}
public ImmutableArray<TValue> ValuesCopy => dict.ValuesCopy;
public void ForEachValue(Action<TValue> action) {
dict.ForEachValue(action);
}
public bool TryGetValue(TKey key, [MaybeNullWhen(false)] out TValue value) {
return dict.TryGetValue(key, out value);
}
public bool GetOrAdd(TKey key, Func<TKey, TValue> valueFactory, out TValue value) {
return FireCollectionChangedIf(dict.GetOrAdd(key, valueFactory, out value));
}
public bool TryAdd(TKey key, TValue newValue) {
return FireCollectionChangedIf(dict.TryAdd(key, newValue));
}
public bool AddOrReplace(TKey key, TValue newValue, [MaybeNullWhen(false)] out TValue oldValue) {
return FireCollectionChangedIf(dict.AddOrReplace(key, newValue, out oldValue));
}
public bool AddOrReplaceIf(TKey key, TValue newValue, Predicate<TValue> replaceCondition) {
return FireCollectionChangedIf(dict.AddOrReplaceIf(key, newValue, replaceCondition));
}
public bool TryReplace(TKey key, Func<TValue, TValue> replacementValue) {
return FireCollectionChangedIf(dict.TryReplace(key, replacementValue));
}
public bool TryReplaceIf(TKey key, Func<TValue, TValue> replacementValue, Predicate<TValue> replaceCondition) {
return FireCollectionChangedIf(dict.TryReplaceIf(key, replacementValue, replaceCondition));
}
public bool ReplaceAll(Func<TValue, TValue> replacementValue) {
return FireCollectionChangedIf(dict.ReplaceAll(replacementValue));
}
public bool ReplaceAllIf(Func<TValue, TValue> replacementValue, Predicate<TValue> replaceCondition) {
return FireCollectionChangedIf(dict.ReplaceAllIf(replacementValue, replaceCondition));
}
public bool Remove(TKey key) {
return FireCollectionChangedIf(dict.Remove(key));
}
public bool RemoveIf(TKey key, Predicate<TValue> removeCondition) {
return FireCollectionChangedIf(dict.RemoveIf(key, removeCondition));
}
public bool RemoveAll(Predicate<KeyValuePair<TKey, TValue>> removeCondition) {
return FireCollectionChangedIf(dict.RemoveAll(removeCondition));
}
public ImmutableDictionary<TKey, TValue> ToImmutable() {
return dict.ToImmutable();
}
public ImmutableDictionary<TKey, TNewValue> ToImmutable<TNewValue>(Func<TValue, TNewValue> valueSelector) {
return dict.ToImmutable(valueSelector);
}
}

View File

@ -6,19 +6,19 @@ using Phantom.Utils.Logging;
namespace Phantom.Web.Services.Agents; namespace Phantom.Web.Services.Agents;
public sealed class AgentManager { public sealed class AgentManager {
private readonly SimpleObservableState<ImmutableArray<AgentWithStats>> agents = new (PhantomLogger.Create<AgentManager>("Agents"), ImmutableArray<AgentWithStats>.Empty); private readonly SimpleObservableState<ImmutableArray<Agent>> agents = new (PhantomLogger.Create<AgentManager>("Agents"), ImmutableArray<Agent>.Empty);
public EventSubscribers<ImmutableArray<AgentWithStats>> AgentsChanged => agents.Subs; public EventSubscribers<ImmutableArray<Agent>> AgentsChanged => agents.Subs;
internal void RefreshAgents(ImmutableArray<AgentWithStats> newAgents) { internal void RefreshAgents(ImmutableArray<Agent> newAgents) {
agents.SetTo(newAgents); agents.SetTo(newAgents);
} }
public ImmutableArray<AgentWithStats> GetAll() { public ImmutableArray<Agent> GetAll() {
return agents.Value; return agents.Value;
} }
public ImmutableDictionary<Guid, AgentWithStats> ToDictionaryByGuid() { public ImmutableDictionary<Guid, Agent> ToDictionaryByGuid() {
return agents.Value.ToImmutableDictionary(static agent => agent.Guid); return agents.Value.ToImmutableDictionary(static agent => agent.Configuration.AgentGuid);
} }
} }

View File

@ -39,18 +39,18 @@ public sealed class InstanceManager {
return controllerConnection.Send<CreateOrUpdateInstanceMessage, InstanceActionResult<CreateOrUpdateInstanceResult>>(message, cancellationToken); return controllerConnection.Send<CreateOrUpdateInstanceMessage, InstanceActionResult<CreateOrUpdateInstanceResult>>(message, cancellationToken);
} }
public Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid loggedInUserGuid, Guid instanceGuid, CancellationToken cancellationToken) { public Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid loggedInUserGuid, Guid agentGuid, Guid instanceGuid, CancellationToken cancellationToken) {
var message = new LaunchInstanceMessage(loggedInUserGuid, instanceGuid); var message = new LaunchInstanceMessage(loggedInUserGuid, agentGuid, instanceGuid);
return controllerConnection.Send<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(message, cancellationToken); return controllerConnection.Send<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(message, cancellationToken);
} }
public Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid loggedInUserGuid, Guid instanceGuid, MinecraftStopStrategy stopStrategy, CancellationToken cancellationToken) { public Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid loggedInUserGuid, Guid agentGuid, Guid instanceGuid, MinecraftStopStrategy stopStrategy, CancellationToken cancellationToken) {
var message = new StopInstanceMessage(loggedInUserGuid, instanceGuid, stopStrategy); var message = new StopInstanceMessage(loggedInUserGuid, agentGuid, instanceGuid, stopStrategy);
return controllerConnection.Send<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(message, cancellationToken); return controllerConnection.Send<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(message, cancellationToken);
} }
public Task<InstanceActionResult<SendCommandToInstanceResult>> SendCommandToInstance(Guid loggedInUserGuid, Guid instanceGuid, string command, CancellationToken cancellationToken) { public Task<InstanceActionResult<SendCommandToInstanceResult>> SendCommandToInstance(Guid loggedInUserGuid, Guid agentGuid, Guid instanceGuid, string command, CancellationToken cancellationToken) {
var message = new SendCommandToInstanceMessage(loggedInUserGuid, instanceGuid, command); var message = new SendCommandToInstanceMessage(loggedInUserGuid, agentGuid, instanceGuid, command);
return controllerConnection.Send<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(message, cancellationToken); return controllerConnection.Send<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(message, cancellationToken);
} }
} }

View File

@ -18,42 +18,47 @@
</HeaderRow> </HeaderRow>
<ItemRow Context="agent"> <ItemRow Context="agent">
@{ @{
var configuration = agent.Configuration;
var usedInstances = agent.Stats?.RunningInstanceCount; var usedInstances = agent.Stats?.RunningInstanceCount;
var usedMemory = agent.Stats?.RunningInstanceMemory.InMegabytes; var usedMemory = agent.Stats?.RunningInstanceMemory.InMegabytes;
} }
<Cell> <Cell>
<p class="fw-semibold">@agent.Name</p> <p class="fw-semibold">@configuration.AgentName</p>
<small class="font-monospace text-uppercase">@agent.Guid.ToString()</small> <small class="font-monospace text-uppercase">@configuration.AgentGuid.ToString()</small>
</Cell> </Cell>
<Cell class="text-end"> <Cell class="text-end">
<ProgressBar Value="@(usedInstances ?? 0)" Maximum="@agent.MaxInstances"> <ProgressBar Value="@(usedInstances ?? 0)" Maximum="@configuration.MaxInstances">
@(usedInstances?.ToString() ?? "?") / @agent.MaxInstances.ToString() @(usedInstances?.ToString() ?? "?") / @configuration.MaxInstances.ToString()
</ProgressBar> </ProgressBar>
</Cell> </Cell>
<Cell class="text-end"> <Cell class="text-end">
<ProgressBar Value="@(usedMemory ?? 0)" Maximum="@agent.MaxMemory.InMegabytes"> <ProgressBar Value="@(usedMemory ?? 0)" Maximum="@configuration.MaxMemory.InMegabytes">
@(usedMemory?.ToString() ?? "?") / @agent.MaxMemory.InMegabytes.ToString() MB @(usedMemory?.ToString() ?? "?") / @configuration.MaxMemory.InMegabytes.ToString() MB
</ProgressBar> </ProgressBar>
</Cell> </Cell>
<Cell class="text-condensed"> <Cell class="text-condensed">
Build: <span class="font-monospace">@agent.BuildVersion</span> Build: <span class="font-monospace">@configuration.BuildVersion</span>
<br> <br>
Protocol: <span class="font-monospace">v@(agent.ProtocolVersion.ToString())</span> Protocol: <span class="font-monospace">v@(configuration.ProtocolVersion.ToString())</span>
</Cell> </Cell>
@if (agent.IsOnline) { @switch (agent.ConnectionStatus) {
<Cell class="fw-semibold text-center text-success">Online</Cell> case AgentIsOnline:
<Cell class="text-end">-</Cell> <Cell class="fw-semibold text-center text-success">Online</Cell>
} <Cell class="text-end">-</Cell>
else { break;
<Cell class="fw-semibold text-center">Offline</Cell> case AgentIsOffline:
<Cell class="text-end"> <Cell class="fw-semibold text-center">Offline</Cell>
@if (agent.LastPing is {} lastPing) { <Cell class="text-end">N/A</Cell>
<TimeWithOffset Time="lastPing" /> break;
} case AgentIsDisconnected status:
else { <Cell class="fw-semibold text-center">Offline</Cell>
<text>N/A</text> <Cell class="text-end">
} <TimeWithOffset Time="status.LastPingTime" />
</Cell> </Cell>
break;
default:
<Cell class="fw-semibold text-center">N/A</Cell>
break;
} }
</ItemRow> </ItemRow>
<NoItemsRow> <NoItemsRow>
@ -63,12 +68,12 @@
@code { @code {
private readonly TableData<AgentWithStats, Guid> agentTable = new(); private readonly TableData<Agent, Guid> agentTable = new();
protected override void OnInitialized() { protected override void OnInitialized() {
AgentManager.AgentsChanged.Subscribe(this, agents => { AgentManager.AgentsChanged.Subscribe(this, agents => {
var sortedAgents = agents.Sort(static (a1, a2) => a1.Name.CompareTo(a2.Name)); var sortedAgents = agents.Sort(static (a1, a2) => a1.Configuration.AgentName.CompareTo(a2.Configuration.AgentName));
agentTable.UpdateFrom(sortedAgents, static agent => agent.Guid, static agent => agent, static (agent, _) => agent); agentTable.UpdateFrom(sortedAgents, static agent => agent.Configuration.AgentGuid, static agent => agent, static (agent, _) => agent);
InvokeAsync(StateHasChanged); InvokeAsync(StateHasChanged);
}); });
} }

View File

@ -61,7 +61,7 @@
try { try {
logItems = await EventLogManager.GetMostRecentItems(50, cancellationToken); logItems = await EventLogManager.GetMostRecentItems(50, cancellationToken);
agentNamesByGuid = AgentManager.GetAll().ToImmutableDictionary(static kvp => kvp.Guid, static kvp => kvp.Name); agentNamesByGuid = AgentManager.GetAll().Select(static agent => agent.Configuration).ToImmutableDictionary(static kvp => kvp.AgentGuid, static kvp => kvp.AgentName);
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid, static instance => instance.Configuration.InstanceName); instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid, static instance => instance.Configuration.InstanceName);
} finally { } finally {
initializationCancellationTokenSource.Dispose(); initializationCancellationTokenSource.Dispose();

View File

@ -41,10 +41,10 @@ else {
<PermissionView Permission="Permission.ControlInstances"> <PermissionView Permission="Permission.ControlInstances">
<div class="mb-3"> <div class="mb-3">
<InstanceCommandInput InstanceGuid="InstanceGuid" Disabled="@(!Instance.Status.CanSendCommand())" /> <InstanceCommandInput AgentGuid="Instance.Configuration.AgentGuid" InstanceGuid="InstanceGuid" Disabled="@(!Instance.Status.CanSendCommand())" />
</div> </div>
<InstanceStopDialog InstanceGuid="InstanceGuid" ModalId="stop-instance" Disabled="@(!Instance.Status.CanStop())" /> <InstanceStopDialog AgentGuid="Instance.Configuration.AgentGuid" InstanceGuid="InstanceGuid" ModalId="stop-instance" Disabled="@(!Instance.Status.CanStop())" />
</PermissionView> </PermissionView>
} }
@ -79,7 +79,12 @@ else {
return; return;
} }
var result = await InstanceManager.LaunchInstance(loggedInUserGuid.Value, InstanceGuid, CancellationToken); if (Instance == null) {
lastError = "Instance not found.";
return;
}
var result = await InstanceManager.LaunchInstance(loggedInUserGuid.Value, Instance.Configuration.AgentGuid, InstanceGuid, CancellationToken);
if (!result.Is(LaunchInstanceResult.LaunchInitiated)) { if (!result.Is(LaunchInstanceResult.LaunchInitiated)) {
lastError = result.ToSentence(Messages.ToSentence); lastError = result.ToSentence(Messages.ToSentence);
} }

View File

@ -66,7 +66,7 @@
protected override void OnInitialized() { protected override void OnInitialized() {
AgentManager.AgentsChanged.Subscribe(this, agents => { AgentManager.AgentsChanged.Subscribe(this, agents => {
this.agentNamesByGuid = agents.ToImmutableDictionary(static agent => agent.Guid, static agent => agent.Name); this.agentNamesByGuid = agents.Select(static agent => agent.Configuration).ToImmutableDictionary(static agent => agent.AgentGuid, static agent => agent.AgentName);
InvokeAsync(StateHasChanged); InvokeAsync(StateHasChanged);
}); });

View File

@ -26,20 +26,21 @@
<div class="row"> <div class="row">
<div class="col-xl-7 mb-3"> <div class="col-xl-7 mb-3">
@{ @{
static RenderFragment GetAgentOption(AgentWithStats agent) { static RenderFragment GetAgentOption(Agent agent) {
return @<option value="@agent.Guid"> var configuration = agent.Configuration;
@agent.Name return @<option value="@configuration.AgentGuid">
@configuration.AgentName
&bullet; &bullet;
@(agent.Stats?.RunningInstanceCount.ToString() ?? "?")/@(agent.MaxInstances) @(agent.MaxInstances == 1 ? "Instance" : "Instances") @(agent.Stats?.RunningInstanceCount.ToString() ?? "?")/@(configuration.MaxInstances) @(configuration.MaxInstances == 1 ? "Instance" : "Instances")
&bullet; &bullet;
@(agent.Stats?.RunningInstanceMemory.InMegabytes.ToString() ?? "?")/@(agent.MaxMemory.InMegabytes) MB RAM @(agent.Stats?.RunningInstanceMemory.InMegabytes.ToString() ?? "?")/@(configuration.MaxMemory.InMegabytes) MB RAM
</option>; </option>;
} }
} }
@if (EditedInstanceConfiguration == null) { @if (EditedInstanceConfiguration == null) {
<FormSelectInput Id="instance-agent" Label="Agent" @bind-Value="form.SelectedAgentGuid"> <FormSelectInput Id="instance-agent" Label="Agent" @bind-Value="form.SelectedAgentGuid">
<option value="" selected>Select which agent will run the instance...</option> <option value="" selected>Select which agent will run the instance...</option>
@foreach (var agent in allAgentsByGuid.Values.Where(static agent => agent.IsOnline).OrderBy(static agent => agent.Name)) { @foreach (var agent in allAgentsByGuid.Values.Where(static agent => agent.ConnectionStatus is AgentIsOnline).OrderBy(static agent => agent.Configuration.AgentName)) {
@GetAgentOption(agent) @GetAgentOption(agent)
} }
</FormSelectInput> </FormSelectInput>
@ -97,8 +98,8 @@
</div> </div>
@{ @{
string? allowedServerPorts = selectedAgent?.AllowedServerPorts?.ToString(); string? allowedServerPorts = selectedAgent?.Configuration.AllowedServerPorts?.ToString();
string? allowedRconPorts = selectedAgent?.AllowedRconPorts?.ToString(); string? allowedRconPorts = selectedAgent?.Configuration.AllowedRconPorts?.ToString();
} }
<div class="col-sm-6 col-xl-2 mb-3"> <div class="col-sm-6 col-xl-2 mb-3">
<FormNumberInput Id="instance-server-port" @bind-Value="form.ServerPort" min="0" max="65535"> <FormNumberInput Id="instance-server-port" @bind-Value="form.ServerPort" min="0" max="65535">
@ -141,7 +142,7 @@
<text>RAM</text> <text>RAM</text>
} }
else { else {
<text>RAM &bullet; <code>@(form.MemoryAllocation?.InMegabytes ?? 0) / @(selectedAgent?.MaxMemory.InMegabytes) MB</code></text> <text>RAM &bullet; <code>@(form.MemoryAllocation?.InMegabytes ?? 0) / @(selectedAgent?.Configuration.MaxMemory.InMegabytes) MB</code></text>
} }
</LabelFragment> </LabelFragment>
</FormNumberInput> </FormNumberInput>
@ -169,7 +170,7 @@
private ConfigureInstanceFormModel form = null!; private ConfigureInstanceFormModel form = null!;
private ImmutableDictionary<Guid, AgentWithStats> allAgentsByGuid = ImmutableDictionary<Guid, AgentWithStats>.Empty; private ImmutableDictionary<Guid, Agent> allAgentsByGuid = ImmutableDictionary<Guid, Agent>.Empty;
private ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> allAgentJavaRuntimes = ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>.Empty; private ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>> allAgentJavaRuntimes = ImmutableDictionary<Guid, ImmutableArray<TaggedJavaRuntime>>.Empty;
private MinecraftVersionType minecraftVersionType = MinecraftVersionType.Release; private MinecraftVersionType minecraftVersionType = MinecraftVersionType.Release;
@ -197,15 +198,15 @@
} }
} }
private bool TryGetAgent(Guid? agentGuid, [NotNullWhen(true)] out AgentWithStats? agent) { private bool TryGetAgent(Guid? agentGuid, [NotNullWhen(true)] out Agent? agent) {
return TryGet(page.allAgentsByGuid, agentGuid, out agent); return TryGet(page.allAgentsByGuid, agentGuid, out agent);
} }
public AgentWithStats? SelectedAgent => TryGetAgent(SelectedAgentGuid, out var agent) ? agent : null; public Agent? SelectedAgent => TryGetAgent(SelectedAgentGuid, out var agent) ? agent : null;
public ImmutableArray<TaggedJavaRuntime> JavaRuntimesForSelectedAgent => TryGet(page.allAgentJavaRuntimes, SelectedAgentGuid, out var javaRuntimes) ? javaRuntimes : ImmutableArray<TaggedJavaRuntime>.Empty; public ImmutableArray<TaggedJavaRuntime> JavaRuntimesForSelectedAgent => TryGet(page.allAgentJavaRuntimes, SelectedAgentGuid, out var javaRuntimes) ? javaRuntimes : ImmutableArray<TaggedJavaRuntime>.Empty;
public ushort MaximumMemoryUnits => SelectedAgent?.MaxMemory.RawValue ?? 0; public ushort MaximumMemoryUnits => SelectedAgent?.Configuration.MaxMemory.RawValue ?? 0;
public ushort AvailableMemoryUnits => Math.Min((SelectedAgent?.AvailableMemory + editedInstanceRamAllocation)?.RawValue ?? MaximumMemoryUnits, MaximumMemoryUnits); public ushort AvailableMemoryUnits => Math.Min((SelectedAgent?.AvailableMemory + editedInstanceRamAllocation)?.RawValue ?? MaximumMemoryUnits, MaximumMemoryUnits);
private ushort selectedMemoryUnits = 4; private ushort selectedMemoryUnits = 4;
@ -246,12 +247,12 @@
public sealed class ServerPortMustBeAllowedAttribute : FormValidationAttribute<ConfigureInstanceFormModel, int> { public sealed class ServerPortMustBeAllowedAttribute : FormValidationAttribute<ConfigureInstanceFormModel, int> {
protected override string FieldName => nameof(ServerPort); protected override string FieldName => nameof(ServerPort);
protected override bool IsValid(ConfigureInstanceFormModel model, int value) => model.SelectedAgent is not {} agent || agent.AllowedServerPorts?.Contains((ushort) value) == true; protected override bool IsValid(ConfigureInstanceFormModel model, int value) => model.SelectedAgent is not {} agent || agent.Configuration.AllowedServerPorts?.Contains((ushort) value) == true;
} }
public sealed class RconPortMustBeAllowedAttribute : FormValidationAttribute<ConfigureInstanceFormModel, int> { public sealed class RconPortMustBeAllowedAttribute : FormValidationAttribute<ConfigureInstanceFormModel, int> {
protected override string FieldName => nameof(RconPort); protected override string FieldName => nameof(RconPort);
protected override bool IsValid(ConfigureInstanceFormModel model, int value) => model.SelectedAgent is not {} agent || agent.AllowedRconPorts?.Contains((ushort) value) == true; protected override bool IsValid(ConfigureInstanceFormModel model, int value) => model.SelectedAgent is not {} agent || agent.Configuration.AllowedRconPorts?.Contains((ushort) value) == true;
} }
public sealed class RconPortMustDifferFromServerPortAttribute : FormValidationAttribute<ConfigureInstanceFormModel, int?> { public sealed class RconPortMustDifferFromServerPortAttribute : FormValidationAttribute<ConfigureInstanceFormModel, int?> {
@ -328,7 +329,7 @@
} }
var instance = new InstanceConfiguration( var instance = new InstanceConfiguration(
EditedInstanceConfiguration?.AgentGuid ?? selectedAgent.Guid, EditedInstanceConfiguration?.AgentGuid ?? selectedAgent.Configuration.AgentGuid,
EditedInstanceConfiguration?.InstanceGuid ?? Guid.NewGuid(), EditedInstanceConfiguration?.InstanceGuid ?? Guid.NewGuid(),
form.InstanceName, form.InstanceName,
(ushort) form.ServerPort, (ushort) form.ServerPort,

View File

@ -16,7 +16,10 @@
@code { @code {
[Parameter] [Parameter, EditorRequired]
public Guid AgentGuid { get; set; }
[Parameter, EditorRequired]
public Guid InstanceGuid { get; set; } public Guid InstanceGuid { get; set; }
[Parameter] [Parameter]
@ -39,7 +42,7 @@
return; return;
} }
var result = await InstanceManager.SendCommandToInstance(loggedInUserGuid.Value, InstanceGuid, form.Command, CancellationToken); var result = await InstanceManager.SendCommandToInstance(loggedInUserGuid.Value, AgentGuid, InstanceGuid, form.Command, CancellationToken);
if (result.Is(SendCommandToInstanceResult.Success)) { if (result.Is(SendCommandToInstanceResult.Success)) {
form.Command = string.Empty; form.Command = string.Empty;
form.SubmitModel.StopSubmitting(); form.SubmitModel.StopSubmitting();

View File

@ -31,6 +31,9 @@
@code { @code {
[Parameter, EditorRequired]
public Guid AgentGuid { get; init; }
[Parameter, EditorRequired] [Parameter, EditorRequired]
public Guid InstanceGuid { get; init; } public Guid InstanceGuid { get; init; }
@ -56,7 +59,7 @@
return; return;
} }
var result = await InstanceManager.StopInstance(loggedInUserGuid.Value, InstanceGuid, new MinecraftStopStrategy(form.StopInSeconds), CancellationToken); var result = await InstanceManager.StopInstance(loggedInUserGuid.Value, AgentGuid, InstanceGuid, new MinecraftStopStrategy(form.StopInSeconds), CancellationToken);
if (result.Is(StopInstanceResult.StopInitiated)) { if (result.Is(StopInstanceResult.StopInitiated)) {
await Js.InvokeVoidAsync("closeModal", ModalId); await Js.InvokeVoidAsync("closeModal", ModalId);
form.SubmitModel.StopSubmitting(); form.SubmitModel.StopSubmitting();