mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2024-11-25 07:42:58 +01:00
Compare commits
2 Commits
cddc3c404b
...
d241ed9f2f
Author | SHA1 | Date | |
---|---|---|---|
d241ed9f2f | |||
7eb72b7360 |
@ -5,8 +5,8 @@ namespace Phantom.Common.Data.Web.Agent;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record AgentConfiguration(
|
||||
[property: MemoryPackOrder(0)] Guid Guid,
|
||||
[property: MemoryPackOrder(1)] string Name,
|
||||
[property: MemoryPackOrder(0)] Guid AgentGuid,
|
||||
[property: MemoryPackOrder(1)] string AgentName,
|
||||
[property: MemoryPackOrder(2)] ushort ProtocolVersion,
|
||||
[property: MemoryPackOrder(3)] string BuildVersion,
|
||||
[property: MemoryPackOrder(4)] ushort MaxInstances,
|
||||
@ -15,6 +15,6 @@ public sealed partial record AgentConfiguration(
|
||||
[property: MemoryPackOrder(7)] AllowedPorts? AllowedRconPorts = null
|
||||
) {
|
||||
public static AgentConfiguration From(AgentInfo agentInfo) {
|
||||
return new AgentConfiguration(agentInfo.Guid, agentInfo.Name, agentInfo.ProtocolVersion, agentInfo.BuildVersion, agentInfo.MaxInstances, agentInfo.MaxMemory, agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
|
||||
return new AgentConfiguration(agentInfo.AgentGuid, agentInfo.AgentName, agentInfo.ProtocolVersion, agentInfo.BuildVersion, agentInfo.MaxInstances, agentInfo.MaxMemory, agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
|
||||
}
|
||||
}
|
||||
|
@ -4,8 +4,8 @@ namespace Phantom.Common.Data.Agent;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record AgentInfo(
|
||||
[property: MemoryPackOrder(0)] Guid Guid,
|
||||
[property: MemoryPackOrder(1)] string Name,
|
||||
[property: MemoryPackOrder(0)] Guid AgentGuid,
|
||||
[property: MemoryPackOrder(1)] string AgentName,
|
||||
[property: MemoryPackOrder(2)] ushort ProtocolVersion,
|
||||
[property: MemoryPackOrder(3)] string BuildVersion,
|
||||
[property: MemoryPackOrder(4)] ushort MaxInstances,
|
||||
|
@ -3,3 +3,12 @@
|
||||
public enum ConfigureInstanceResult : byte {
|
||||
Success
|
||||
}
|
||||
|
||||
public static class ConfigureInstanceResultExtensions {
|
||||
public static string ToSentence(this ConfigureInstanceResult reason) {
|
||||
return reason switch {
|
||||
ConfigureInstanceResult.Success => "Success.",
|
||||
_ => "Unknown error."
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -6,21 +6,20 @@ using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Agent;
|
||||
using Phantom.Common.Data.Instance;
|
||||
using Phantom.Common.Data.Java;
|
||||
using Phantom.Common.Data.Minecraft;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Data.Web.Agent;
|
||||
using Phantom.Common.Data.Web.Instance;
|
||||
using Phantom.Common.Data.Web.Minecraft;
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Controller.Database;
|
||||
using Phantom.Controller.Database.Entities;
|
||||
using Phantom.Controller.Database.Repositories;
|
||||
using Phantom.Controller.Minecraft;
|
||||
using Phantom.Controller.Services.Instances;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Actor.Mailbox;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Phantom.Utils.Tasks;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Agents;
|
||||
@ -31,7 +30,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5);
|
||||
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
|
||||
|
||||
public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
||||
public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, TaskManager TaskManager, CancellationToken CancellationToken);
|
||||
|
||||
public static Props<ICommand> Factory(Init init) {
|
||||
return Props<ICommand>.Create(() => new AgentActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
|
||||
@ -40,13 +39,15 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
private readonly ControllerState controllerState;
|
||||
private readonly MinecraftVersions minecraftVersions;
|
||||
private readonly IDbContextProvider dbProvider;
|
||||
private readonly TaskManager taskManager;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
private AgentConfiguration configuration;
|
||||
private AgentStats? stats;
|
||||
private AgentConnection? connection;
|
||||
private ImmutableArray<TaggedJavaRuntime> javaRuntimes = ImmutableArray<TaggedJavaRuntime>.Empty;
|
||||
|
||||
private readonly AgentConnection connection;
|
||||
|
||||
private DateTimeOffset? lastPingTime;
|
||||
private bool isOnline;
|
||||
|
||||
@ -64,16 +65,23 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
}
|
||||
}
|
||||
|
||||
private readonly ActorRef<AgentDatabaseStorageActor.ICommand> databaseStorageActor;
|
||||
|
||||
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
|
||||
private readonly Dictionary<Guid, Instance> instanceDataByGuid = new ();
|
||||
|
||||
private AgentActor(Init init) {
|
||||
this.configuration = init.Configuration;
|
||||
this.controllerState = init.ControllerState;
|
||||
this.minecraftVersions = init.MinecraftVersions;
|
||||
this.dbProvider = init.DbProvider;
|
||||
this.taskManager = init.TaskManager;
|
||||
this.cancellationToken = init.CancellationToken;
|
||||
|
||||
this.configuration = init.Configuration;
|
||||
this.connection = new AgentConnection(configuration.AgentGuid, configuration.AgentName);
|
||||
|
||||
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(configuration.AgentGuid, dbProvider, cancellationToken)), "DatabaseStorage");
|
||||
|
||||
NotifyAgentUpdated();
|
||||
|
||||
ReceiveAsync<InitializeCommand>(Initialize);
|
||||
@ -83,10 +91,12 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
Receive<NotifyIsAliveCommand>(NotifyIsAlive);
|
||||
Receive<UpdateStatsCommand>(UpdateStats);
|
||||
Receive<UpdateJavaRuntimesCommand>(UpdateJavaRuntimes);
|
||||
ReceiveAsyncAndReply<SendMessageCommand, object?>(SendMessage);
|
||||
ReceiveAsyncAndReply<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
|
||||
Receive<TellInstanceCommand>(TellInstance);
|
||||
Receive<UpdateInstanceDataCommand>(UpdateInstanceData);
|
||||
ReceiveAsyncAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
|
||||
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
|
||||
Receive<LaunchInstanceCommand>(LaunchInstance);
|
||||
Receive<StopInstanceCommand>(StopInstance);
|
||||
Receive<SendMinecraftCommandCommand>(SendMinecraftCommand);
|
||||
Receive<ReceiveInstanceDataCommand>(ReceiveInstanceData);
|
||||
}
|
||||
|
||||
private void NotifyAgentUpdated() {
|
||||
@ -99,10 +109,12 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
Context.System.Scheduler.ScheduleTellRepeatedly(DisconnectionRecheckInterval, DisconnectionRecheckInterval, Self, new RefreshConnectionStatusCommand(), Self);
|
||||
}
|
||||
|
||||
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
|
||||
var init = new InstanceActor.Init(instance, SelfTyped, dbProvider, cancellationToken);
|
||||
var name = "Instance:" + instance.Configuration.InstanceGuid;
|
||||
return Context.ActorOf(InstanceActor.Factory(init), name);
|
||||
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
|
||||
UpdateInstanceData(instance);
|
||||
|
||||
var instanceActor = CreateInstanceActor(instance);
|
||||
instanceActorByGuid.Add(instance.Configuration.InstanceGuid, instanceActor);
|
||||
return instanceActor;
|
||||
}
|
||||
|
||||
private void UpdateInstanceData(Instance instance) {
|
||||
@ -110,11 +122,26 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
controllerState.UpdateInstance(instance);
|
||||
}
|
||||
|
||||
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
|
||||
var init = new InstanceActor.Init(instance, SelfTyped, connection, dbProvider, cancellationToken);
|
||||
var name = "Instance:" + instance.Configuration.InstanceGuid;
|
||||
return Context.ActorOf(InstanceActor.Factory(init), name);
|
||||
}
|
||||
|
||||
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
|
||||
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
|
||||
instance.Forward(command);
|
||||
}
|
||||
else {
|
||||
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
|
||||
}
|
||||
}
|
||||
|
||||
public interface ICommand {}
|
||||
|
||||
private sealed record InitializeCommand : ICommand;
|
||||
|
||||
public sealed record RegisterCommand(AgentConfiguration Configuration, AgentConnection Connection) : ICommand, ICanReply<ImmutableArray<Instance>>;
|
||||
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<Instance>>;
|
||||
|
||||
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand;
|
||||
|
||||
@ -126,17 +153,21 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
|
||||
public sealed record UpdateJavaRuntimesCommand(ImmutableArray<TaggedJavaRuntime> JavaRuntimes) : ICommand;
|
||||
|
||||
public sealed record SendMessageCommand(Func<AgentConnection, Task<object>> SendMessageViaConnection) : ICommand, ICanReply<object?>;
|
||||
|
||||
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
|
||||
|
||||
public sealed record TellInstanceCommand(Guid InstanceGuid, InstanceActor.ICommand Command) : ICommand;
|
||||
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
|
||||
|
||||
public sealed record UpdateInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead;
|
||||
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
|
||||
|
||||
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
|
||||
|
||||
public sealed record SendMinecraftCommandCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
|
||||
|
||||
public sealed record ReceiveInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead;
|
||||
|
||||
private async Task Initialize(InitializeCommand command) {
|
||||
await using var ctx = dbProvider.Eager();
|
||||
await foreach (var entity in ctx.Instances.Where(instance => instance.AgentGuid == configuration.Guid).AsAsyncEnumerable().WithCancellation(cancellationToken)) {
|
||||
await foreach (var entity in ctx.Instances.Where(instance => instance.AgentGuid == configuration.AgentGuid).AsAsyncEnumerable().WithCancellation(cancellationToken)) {
|
||||
var instanceConfiguration = new InstanceConfiguration(
|
||||
entity.AgentGuid,
|
||||
entity.InstanceGuid,
|
||||
@ -150,33 +181,28 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
JvmArgumentsHelper.Split(entity.JvmArguments)
|
||||
);
|
||||
|
||||
var instance = Instance.Offline(instanceConfiguration, entity.LaunchAutomatically);
|
||||
UpdateInstanceData(instance);
|
||||
instanceActorByGuid[instanceConfiguration.InstanceGuid] = CreateInstanceActor(instance);
|
||||
CreateNewInstance(Instance.Offline(instanceConfiguration, entity.LaunchAutomatically));
|
||||
}
|
||||
}
|
||||
|
||||
private ImmutableArray<Instance> Register(RegisterCommand command) {
|
||||
connection?.Close();
|
||||
|
||||
configuration = command.Configuration;
|
||||
connection = command.Connection;
|
||||
connection.UpdateConnection(command.Connection, configuration.AgentName);
|
||||
|
||||
lastPingTime = DateTimeOffset.Now;
|
||||
isOnline = true;
|
||||
|
||||
NotifyAgentUpdated();
|
||||
|
||||
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.Name, configuration.Guid);
|
||||
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentCommand(configuration.AgentName, configuration.ProtocolVersion, configuration.BuildVersion, configuration.MaxInstances, configuration.MaxMemory));
|
||||
|
||||
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
|
||||
return instanceDataByGuid.Values.ToImmutableArray();
|
||||
}
|
||||
|
||||
private void Unregister(UnregisterCommand command) {
|
||||
if (connection?.IsSame(command.Connection) == true) {
|
||||
connection.Close();
|
||||
|
||||
if (connection.CloseIfSame(command.Connection)) {
|
||||
stats = null;
|
||||
connection = null;
|
||||
lastPingTime = null;
|
||||
isOnline = false;
|
||||
NotifyAgentUpdated();
|
||||
@ -187,7 +213,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
instance.Tell(setStatusCommand);
|
||||
}
|
||||
|
||||
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.Name, configuration.Guid);
|
||||
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
|
||||
}
|
||||
}
|
||||
|
||||
@ -196,7 +222,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
isOnline = false;
|
||||
NotifyAgentUpdated();
|
||||
|
||||
Logger.Warning("Lost connection to agent \"{Name}\" (GUID {Guid}).", configuration.Name, configuration.Guid);
|
||||
Logger.Warning("Lost connection to agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
|
||||
}
|
||||
}
|
||||
|
||||
@ -216,125 +242,86 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
|
||||
private void UpdateJavaRuntimes(UpdateJavaRuntimesCommand command) {
|
||||
javaRuntimes = command.JavaRuntimes;
|
||||
controllerState.UpdateAgentJavaRuntimes(configuration.Guid, javaRuntimes);
|
||||
controllerState.UpdateAgentJavaRuntimes(configuration.AgentGuid, javaRuntimes);
|
||||
}
|
||||
|
||||
private Task<object?> SendMessage(SendMessageCommand command) {
|
||||
return SendMessage(command.SendMessageViaConnection);
|
||||
}
|
||||
|
||||
private async Task<object?> SendMessage(Func<AgentConnection, Task<object>> commandSendMessageViaConnection) {
|
||||
if (connection == null) {
|
||||
// TODO handle missing agent?
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return await commandSendMessageViaConnection(connection);
|
||||
} catch (Exception e) {
|
||||
Logger.Error(e, "Could not send message to agent \"{Name}\" (GUID {Guid}).", configuration.Name, configuration.Guid);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<TReply?> SendMessage<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||
return (TReply?) await SendMessage(async conn => await conn.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken));
|
||||
}
|
||||
|
||||
[SuppressMessage("ReSharper", "ConvertIfStatementToConditionalTernaryExpression")]
|
||||
private async Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command) {
|
||||
private async Task CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command, TaskCompletionSource<InstanceActionResult<CreateOrUpdateInstanceResult>> result2) {
|
||||
var instanceConfiguration = command.Configuration;
|
||||
var instanceName = instanceConfiguration.InstanceName;
|
||||
var instanceGuid = instanceConfiguration.InstanceGuid;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(instanceName)) {
|
||||
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty);
|
||||
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
|
||||
return;
|
||||
}
|
||||
|
||||
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
|
||||
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero);
|
||||
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
|
||||
return;
|
||||
}
|
||||
|
||||
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
|
||||
if (serverExecutableInfo == null) {
|
||||
return InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound);
|
||||
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
|
||||
return;
|
||||
}
|
||||
|
||||
bool isExistingInstance = instanceActorByGuid.TryGetValue(instanceGuid, out var instanceActorRef);
|
||||
if (isExistingInstance) {
|
||||
instanceActorRef.Tell(new InstanceActor.SetConfigurationCommand(instanceConfiguration));
|
||||
bool isNewInstance = !instanceActorByGuid.TryGetValue(instanceGuid, out var instanceActorRef);
|
||||
if (isNewInstance) {
|
||||
instanceActorRef = CreateNewInstance(Instance.Offline(instanceConfiguration));
|
||||
}
|
||||
|
||||
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isNewInstance);
|
||||
taskManager.Run("Configure instance " + instanceGuid, async () => await ConfigureInstance(instanceActorRef, configureInstanceCommand, configuration.AgentName, result2, cancellationToken));
|
||||
}
|
||||
|
||||
[SuppressMessage("ReSharper", "ConvertIfStatementToConditionalTernaryExpression")]
|
||||
private static async Task ConfigureInstance(ActorRef<InstanceActor.ICommand> instanceActorRef, InstanceActor.ConfigureInstanceCommand command, string agentName, TaskCompletionSource<InstanceActionResult<CreateOrUpdateInstanceResult>> result2, CancellationToken cancellationToken) {
|
||||
var instanceName = command.Configuration.InstanceName;
|
||||
var instanceGuid = command.Configuration.InstanceGuid;
|
||||
|
||||
var result = await instanceActorRef.Request(command, cancellationToken);
|
||||
|
||||
if (result.Is(ConfigureInstanceResult.Success)) {
|
||||
if (command.IsNewInstance) {
|
||||
Logger.Information("Added instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\".", instanceName, instanceGuid, agentName);
|
||||
}
|
||||
else {
|
||||
instanceActorByGuid.Add(instanceGuid, CreateInstanceActor(Instance.Offline(instanceConfiguration)));
|
||||
Logger.Information("Edited instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\".", instanceName, instanceGuid, agentName);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (command.IsNewInstance) {
|
||||
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, agentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
|
||||
}
|
||||
else {
|
||||
Logger.Information("Failed editing instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, agentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
|
||||
}
|
||||
}
|
||||
|
||||
var message = new ConfigureInstanceMessage(instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo));
|
||||
var reply = await SendMessage<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(message, TimeSpan.FromSeconds(10), cancellationToken);
|
||||
|
||||
var result = reply.DidNotReplyIfNull().Map(static result => result switch {
|
||||
result2.SetResult(result.Map(static result => result switch {
|
||||
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
|
||||
_ => CreateOrUpdateInstanceResult.UnknownError
|
||||
});
|
||||
|
||||
if (result.Is(CreateOrUpdateInstanceResult.Success)) {
|
||||
await using var db = dbProvider.Lazy();
|
||||
|
||||
InstanceEntity entity = db.Ctx.InstanceUpsert.Fetch(instanceGuid);
|
||||
entity.AgentGuid = instanceConfiguration.AgentGuid;
|
||||
entity.InstanceName = instanceConfiguration.InstanceName;
|
||||
entity.ServerPort = instanceConfiguration.ServerPort;
|
||||
entity.RconPort = instanceConfiguration.RconPort;
|
||||
entity.MinecraftVersion = instanceConfiguration.MinecraftVersion;
|
||||
entity.MinecraftServerKind = instanceConfiguration.MinecraftServerKind;
|
||||
entity.MemoryAllocation = instanceConfiguration.MemoryAllocation;
|
||||
entity.JavaRuntimeGuid = instanceConfiguration.JavaRuntimeGuid;
|
||||
entity.JvmArguments = JvmArgumentsHelper.Join(instanceConfiguration.JvmArguments);
|
||||
|
||||
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
|
||||
if (isExistingInstance) {
|
||||
auditLogWriter.InstanceEdited(instanceGuid);
|
||||
}
|
||||
else {
|
||||
auditLogWriter.InstanceCreated(instanceGuid);
|
||||
}));
|
||||
}
|
||||
|
||||
await db.Ctx.SaveChangesAsync(cancellationToken);
|
||||
|
||||
if (isExistingInstance) {
|
||||
Logger.Information("Edited instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\".", instanceName, instanceGuid, configuration.Name);
|
||||
}
|
||||
else {
|
||||
Logger.Information("Added instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\".", instanceName, instanceGuid, configuration.Name);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (isExistingInstance) {
|
||||
Logger.Information("Failed editing instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, configuration.Name, result.ToSentence(CreateOrUpdateInstanceResultExtensions.ToSentence));
|
||||
}
|
||||
else {
|
||||
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, configuration.Name, result.ToSentence(CreateOrUpdateInstanceResultExtensions.ToSentence));
|
||||
}
|
||||
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
|
||||
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
|
||||
}
|
||||
|
||||
return result;
|
||||
private void LaunchInstance(LaunchInstanceCommand command) {
|
||||
TellInstance(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
|
||||
}
|
||||
|
||||
private void TellInstance(TellInstanceCommand command) {
|
||||
if (instanceActorByGuid.TryGetValue(command.InstanceGuid, out var instance)) {
|
||||
instance.Forward(command.Command);
|
||||
}
|
||||
else {
|
||||
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.Command.GetType().Name, command.InstanceGuid);
|
||||
}
|
||||
private void StopInstance(StopInstanceCommand command) {
|
||||
TellInstance(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
|
||||
}
|
||||
|
||||
private void UpdateInstanceData(UpdateInstanceDataCommand command) {
|
||||
private void SendMinecraftCommand(SendMinecraftCommandCommand command) {
|
||||
TellInstance(command.InstanceGuid, new InstanceActor.SendMinecraftCommandCommand(command.AuditLogUserGuid, command.Command));
|
||||
}
|
||||
|
||||
private void ReceiveInstanceData(ReceiveInstanceDataCommand command) {
|
||||
UpdateInstanceData(command.Instance);
|
||||
}
|
||||
}
|
||||
|
||||
static class AgentActorExtensions {
|
||||
internal static async Task<TReply?> SendMessage<TMessage, TReply>(this ActorRef<AgentActor.ICommand> agentActor, TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||
return (TReply?) await agentActor.Request(new AgentActor.SendMessageCommand(async conn => await conn.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken)), cancellationToken);
|
||||
}
|
||||
}
|
||||
|
@ -1,28 +1,65 @@
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Agents;
|
||||
|
||||
sealed class AgentConnection {
|
||||
private readonly RpcConnectionToClient<IMessageToAgentListener> connection;
|
||||
private static readonly ILogger Logger = PhantomLogger.Create<AgentConnection>();
|
||||
|
||||
internal AgentConnection(RpcConnectionToClient<IMessageToAgentListener> connection) {
|
||||
this.connection = connection;
|
||||
private readonly Guid agentGuid;
|
||||
private string agentName;
|
||||
|
||||
private RpcConnectionToClient<IMessageToAgentListener>? connection;
|
||||
|
||||
public AgentConnection(Guid agentGuid, string agentName) {
|
||||
this.agentName = agentName;
|
||||
this.agentGuid = agentGuid;
|
||||
}
|
||||
|
||||
public bool IsSame(RpcConnectionToClient<IMessageToAgentListener> connection) {
|
||||
return this.connection.IsSame(connection);
|
||||
public void UpdateConnection(RpcConnectionToClient<IMessageToAgentListener> newConnection, string newAgentName) {
|
||||
lock (this) {
|
||||
connection?.Close();
|
||||
connection = newConnection;
|
||||
agentName = newAgentName;
|
||||
}
|
||||
}
|
||||
|
||||
public void Close() {
|
||||
public bool CloseIfSame(RpcConnectionToClient<IMessageToAgentListener> expected) {
|
||||
lock (this) {
|
||||
if (connection != null && connection.IsSame(expected)) {
|
||||
connection.Close();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
|
||||
return connection.Send(message);
|
||||
lock (this) {
|
||||
if (connection == null) {
|
||||
LogAgentOffline();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken);
|
||||
return connection.Send(message);
|
||||
}
|
||||
}
|
||||
|
||||
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||
lock (this) {
|
||||
if (connection == null) {
|
||||
LogAgentOffline();
|
||||
return Task.FromResult<TReply?>(default);
|
||||
}
|
||||
|
||||
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken)!;
|
||||
}
|
||||
}
|
||||
|
||||
private void LogAgentOffline() {
|
||||
Logger.Error("Could not send message to offline agent \"{Name}\" (GUID {Guid}).", agentName, agentGuid);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,82 @@
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Controller.Database;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Agents;
|
||||
|
||||
sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.ICommand> {
|
||||
private static readonly ILogger Logger = PhantomLogger.Create<AgentDatabaseStorageActor>();
|
||||
|
||||
public readonly record struct Init(Guid AgentGuid, IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
||||
|
||||
public static Props<ICommand> Factory(Init init) {
|
||||
return Props<ICommand>.Create(() => new AgentDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
||||
}
|
||||
|
||||
private readonly Guid agentGuid;
|
||||
private readonly IDbContextProvider dbProvider;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
private StoreAgentCommand? lastStoreCommand;
|
||||
private bool hasScheduledFlush;
|
||||
|
||||
private AgentDatabaseStorageActor(Init init) {
|
||||
this.agentGuid = init.AgentGuid;
|
||||
this.dbProvider = init.DbProvider;
|
||||
this.cancellationToken = init.CancellationToken;
|
||||
|
||||
Receive<StoreAgentCommand>(StoreAgent);
|
||||
ReceiveAsync<FlushChangesCommand>(FlushChanges);
|
||||
}
|
||||
|
||||
public interface ICommand {}
|
||||
|
||||
public sealed record StoreAgentCommand(string Name, ushort ProtocolVersion, string BuildVersion, ushort MaxInstances, RamAllocationUnits MaxMemory) : ICommand;
|
||||
|
||||
private sealed record FlushChangesCommand : ICommand;
|
||||
|
||||
private void StoreAgent(StoreAgentCommand command) {
|
||||
this.lastStoreCommand = command;
|
||||
ScheduleFlush(TimeSpan.FromSeconds(2));
|
||||
}
|
||||
|
||||
private async Task FlushChanges(FlushChangesCommand command) {
|
||||
hasScheduledFlush = false;
|
||||
|
||||
if (lastStoreCommand == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await using var ctx = dbProvider.Eager();
|
||||
var entity = ctx.AgentUpsert.Fetch(agentGuid);
|
||||
|
||||
entity.Name = lastStoreCommand.Name;
|
||||
entity.ProtocolVersion = lastStoreCommand.ProtocolVersion;
|
||||
entity.BuildVersion = lastStoreCommand.BuildVersion;
|
||||
entity.MaxInstances = lastStoreCommand.MaxInstances;
|
||||
entity.MaxMemory = lastStoreCommand.MaxMemory;
|
||||
|
||||
await ctx.SaveChangesAsync(cancellationToken);
|
||||
} catch (Exception e) {
|
||||
ScheduleFlush(TimeSpan.FromSeconds(10));
|
||||
Logger.Error(e, "Could not store agent \"{AgentName}\" (GUID {AgentGuid}) to database.", lastStoreCommand.Name, agentGuid);
|
||||
return;
|
||||
}
|
||||
|
||||
Logger.Information("Stored agent \"{AgentName}\" (GUID {AgentGuid}) to database.", lastStoreCommand.Name, agentGuid);
|
||||
|
||||
lastStoreCommand = null;
|
||||
}
|
||||
|
||||
private void ScheduleFlush(TimeSpan delay) {
|
||||
if (hasScheduledFlush) {
|
||||
return;
|
||||
}
|
||||
|
||||
hasScheduledFlush = true;
|
||||
Context.System.Scheduler.ScheduleTellOnce(delay, Self, new FlushChangesCommand(), Self);
|
||||
}
|
||||
}
|
@ -11,7 +11,6 @@ using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Controller.Database;
|
||||
using Phantom.Controller.Minecraft;
|
||||
using Phantom.Controller.Services.Instances;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
@ -45,7 +44,7 @@ sealed class AgentManager {
|
||||
|
||||
private ActorRef<AgentActor.ICommand> CreateAgentActor(AgentConfiguration agentConfiguration) {
|
||||
var init = new AgentActor.Init(agentConfiguration, controllerState, minecraftVersions, dbProvider, cancellationToken);
|
||||
var name = "Agent:" + agentConfiguration.Guid;
|
||||
var name = "Agent:" + agentConfiguration.AgentGuid;
|
||||
return actorSystem.ActorOf(AgentActor.Factory(init), name);
|
||||
}
|
||||
|
||||
@ -56,7 +55,7 @@ sealed class AgentManager {
|
||||
var agentProperties = new AgentConfiguration(entity.AgentGuid, entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory);
|
||||
|
||||
if (agentsByGuid.TryAdd(entity.AgentGuid, CreateAgentActor(agentProperties))) {
|
||||
Logger.Information("Loaded agent \"{AgentName}\" (GUID {AgentGuid}) from database.", agentProperties.Name, agentProperties.Guid);
|
||||
Logger.Information("Loaded agent \"{AgentName}\" (GUID {AgentGuid}) from database.", agentProperties.AgentName, agentProperties.AgentGuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -68,30 +67,13 @@ sealed class AgentManager {
|
||||
}
|
||||
|
||||
var agentProperties = AgentConfiguration.From(agentInfo);
|
||||
var updateAgentInDatabaseTask = UpdateAgentInDatabase(agentInfo);
|
||||
|
||||
var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.Guid, addAgentActorFactory, agentProperties);
|
||||
var agentInstances = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, new AgentConnection(connection)), cancellationToken);
|
||||
var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
|
||||
var agentInstances = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
|
||||
await connection.Send(new RegisterAgentSuccessMessage(await GetInstanceConfigurationsForAgent(agentInstances)));
|
||||
|
||||
await updateAgentInDatabaseTask;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private async Task UpdateAgentInDatabase(AgentInfo agentInfo) {
|
||||
await using var ctx = dbProvider.Eager();
|
||||
var entity = ctx.AgentUpsert.Fetch(agentInfo.Guid);
|
||||
|
||||
entity.Name = agentInfo.Name;
|
||||
entity.ProtocolVersion = agentInfo.ProtocolVersion;
|
||||
entity.BuildVersion = agentInfo.BuildVersion;
|
||||
entity.MaxInstances = agentInfo.MaxInstances;
|
||||
entity.MaxMemory = agentInfo.MaxMemory;
|
||||
|
||||
await ctx.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
|
||||
private async Task<ImmutableArray<ConfigureInstanceMessage>> GetInstanceConfigurationsForAgent(ImmutableArray<Instance> instances) {
|
||||
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
|
||||
|
||||
@ -114,19 +96,12 @@ sealed class AgentManager {
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(Guid auditLogUserGuid, InstanceConfiguration configuration) {
|
||||
if (!agentsByGuid.TryGetValue(configuration.AgentGuid, out var agent)) {
|
||||
return InstanceActionResult.General<CreateOrUpdateInstanceResult>(InstanceActionGeneralResult.AgentDoesNotExist);
|
||||
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
|
||||
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
|
||||
return await agent.Request(command, cancellationToken);
|
||||
}
|
||||
|
||||
return await agent.Request(new AgentActor.CreateOrUpdateInstanceCommand(auditLogUserGuid, configuration), cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<InstanceActionResult<TReply>> AskInstance<TCommand, TReply>(Guid agentGuid, Guid instanceGuid, TCommand command) where TCommand : class, InstanceActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
|
||||
if (!agentsByGuid.TryGetValue(agentGuid, out var agent)) {
|
||||
else {
|
||||
return InstanceActionResult.General<TReply>(InstanceActionGeneralResult.AgentDoesNotExist);
|
||||
}
|
||||
|
||||
return await agent.Ask<InstanceActionResult<TReply>>(new AgentActor.TellInstanceCommand(instanceGuid, command), cancellationToken);
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ sealed class ControllerState {
|
||||
public ObservableState<ImmutableDictionary<Guid, Instance>>.Receiver InstancesByGuidReceiver => instancesByGuid.ReceiverSide;
|
||||
|
||||
public void UpdateAgent(Agent agent) {
|
||||
agentsByGuid.PublisherSide.Publish(static (agentsByGuid, agent) => agentsByGuid.SetItem(agent.Configuration.Guid, agent), agent);
|
||||
agentsByGuid.PublisherSide.Publish(static (agentsByGuid, agent) => agentsByGuid.SetItem(agent.Configuration.AgentGuid, agent), agent);
|
||||
}
|
||||
|
||||
public void UpdateAgentJavaRuntimes(Guid agentGuid, ImmutableArray<TaggedJavaRuntime> runtimes) {
|
||||
|
@ -2,9 +2,11 @@
|
||||
using Phantom.Common.Data.Minecraft;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Data.Web.Instance;
|
||||
using Phantom.Common.Data.Web.Minecraft;
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Controller.Database;
|
||||
using Phantom.Controller.Database.Entities;
|
||||
using Phantom.Controller.Database.Repositories;
|
||||
using Phantom.Controller.Services.Agents;
|
||||
using Phantom.Utils.Actor;
|
||||
@ -12,13 +14,14 @@ using Phantom.Utils.Actor;
|
||||
namespace Phantom.Controller.Services.Instances;
|
||||
|
||||
sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
||||
public readonly record struct Init(Instance Instance, ActorRef<AgentActor.ICommand> AgentActorRef, IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
||||
public readonly record struct Init(Instance Instance, ActorRef<AgentActor.ICommand> AgentActorRef, AgentConnection AgentConnection, IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
||||
|
||||
public static Props<ICommand> Factory(Init init) {
|
||||
return Props<ICommand>.Create(() => new InstanceActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
||||
}
|
||||
|
||||
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
|
||||
private readonly AgentConnection agentConnection;
|
||||
private readonly IDbContextProvider dbProvider;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
@ -30,6 +33,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
||||
|
||||
private InstanceActor(Init init) {
|
||||
this.agentActorRef = init.AgentActorRef;
|
||||
this.agentConnection = init.AgentConnection;
|
||||
this.dbProvider = init.DbProvider;
|
||||
this.cancellationToken = init.CancellationToken;
|
||||
|
||||
@ -38,8 +42,8 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
||||
this.status = instance.Status;
|
||||
this.launchAutomatically = instance.LaunchAutomatically;
|
||||
|
||||
Receive<SetConfigurationCommand>(SetConfiguration);
|
||||
Receive<SetStatusCommand>(SetStatus);
|
||||
ReceiveAsyncAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance);
|
||||
ReceiveAsyncAndReply<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
|
||||
ReceiveAsyncAndReply<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
|
||||
ReceiveAsyncAndReply<SendMinecraftCommandCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
|
||||
@ -47,36 +51,66 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
||||
|
||||
private void NotifyInstanceUpdated() {
|
||||
var instance = new Instance(configuration, status, launchAutomatically);
|
||||
agentActorRef.Tell(new AgentActor.UpdateInstanceDataCommand(instance));
|
||||
agentActorRef.Tell(new AgentActor.ReceiveInstanceDataCommand(instance));
|
||||
}
|
||||
|
||||
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
|
||||
var reply = await agentActorRef.SendMessage<TMessage, InstanceActionResult<TReply>>(message, TimeSpan.FromSeconds(10), cancellationToken);
|
||||
var reply = await agentConnection.Send<TMessage, InstanceActionResult<TReply>>(message, TimeSpan.FromSeconds(10), cancellationToken);
|
||||
return reply.DidNotReplyIfNull();
|
||||
}
|
||||
|
||||
public interface ICommand {}
|
||||
|
||||
public sealed record SetConfigurationCommand(InstanceConfiguration Configuration) : ICommand;
|
||||
|
||||
public sealed record SetStatusCommand(IInstanceStatus Status) : ICommand;
|
||||
|
||||
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsNewInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
|
||||
|
||||
public sealed record LaunchInstanceCommand(Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
|
||||
|
||||
public sealed record StopInstanceCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
|
||||
|
||||
public sealed record SendMinecraftCommandCommand(Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
|
||||
|
||||
private void SetConfiguration(SetConfigurationCommand command) {
|
||||
configuration = command.Configuration;
|
||||
NotifyInstanceUpdated();
|
||||
}
|
||||
|
||||
private void SetStatus(SetStatusCommand command) {
|
||||
status = command.Status;
|
||||
NotifyInstanceUpdated();
|
||||
}
|
||||
|
||||
private async Task<InstanceActionResult<ConfigureInstanceResult>> ConfigureInstance(ConfigureInstanceCommand command) {
|
||||
var message = new ConfigureInstanceMessage(command.Configuration, command.LaunchProperties);
|
||||
var result = await SendInstanceActionMessage<ConfigureInstanceMessage, ConfigureInstanceResult>(message);
|
||||
|
||||
if (result.Is(ConfigureInstanceResult.Success)) {
|
||||
configuration = command.Configuration;
|
||||
NotifyInstanceUpdated();
|
||||
|
||||
await using var db = dbProvider.Lazy();
|
||||
|
||||
InstanceEntity entity = db.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
|
||||
entity.AgentGuid = configuration.AgentGuid;
|
||||
entity.InstanceName = configuration.InstanceName;
|
||||
entity.ServerPort = configuration.ServerPort;
|
||||
entity.RconPort = configuration.RconPort;
|
||||
entity.MinecraftVersion = configuration.MinecraftVersion;
|
||||
entity.MinecraftServerKind = configuration.MinecraftServerKind;
|
||||
entity.MemoryAllocation = configuration.MemoryAllocation;
|
||||
entity.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
|
||||
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
|
||||
|
||||
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
|
||||
if (command.IsNewInstance) {
|
||||
auditLogWriter.InstanceEdited(configuration.InstanceGuid);
|
||||
}
|
||||
else {
|
||||
auditLogWriter.InstanceCreated(configuration.InstanceGuid);
|
||||
}
|
||||
|
||||
await db.Ctx.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
|
||||
var message = new LaunchInstanceMessage(InstanceGuid);
|
||||
var result = await SendInstanceActionMessage<LaunchInstanceMessage, LaunchInstanceResult>(message);
|
||||
|
@ -0,0 +1,9 @@
|
||||
using Phantom.Utils.Actor;
|
||||
|
||||
namespace Phantom.Controller.Services.Instances;
|
||||
|
||||
sealed class InstanceDatabaseStorageActor : ReceiveActor<InstanceDatabaseStorageActor.ICommand> {
|
||||
public interface ICommand {}
|
||||
|
||||
public sealed record StoreInstanceCommand() : ICommand;
|
||||
}
|
@ -30,13 +30,13 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
|
||||
}
|
||||
|
||||
public async Task<NoReply> HandleRegisterAgent(RegisterAgentMessage message) {
|
||||
if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.Guid) {
|
||||
if (agentGuidWaiter.Task.IsCompleted && agentGuidWaiter.Task.Result != message.AgentInfo.AgentGuid) {
|
||||
connection.SetAuthorizationResult(false);
|
||||
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentFailure.ConnectionAlreadyHasAnAgent));
|
||||
}
|
||||
else if (await agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, connection)) {
|
||||
connection.SetAuthorizationResult(true);
|
||||
agentGuidWaiter.SetResult(message.AgentInfo.Guid);
|
||||
agentGuidWaiter.SetResult(message.AgentInfo.AgentGuid);
|
||||
}
|
||||
|
||||
return NoReply.Instance;
|
||||
@ -71,7 +71,7 @@ public sealed class AgentMessageListener : IMessageToControllerListener {
|
||||
}
|
||||
|
||||
public async Task<NoReply> HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
|
||||
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.TellInstanceCommand(message.InstanceGuid, new InstanceActor.SetStatusCommand(message.InstanceStatus)));
|
||||
agentManager.TellAgent(await WaitForAgentGuid(), new AgentActor.UpdateInstanceStatusCommand(message.InstanceGuid, message.InstanceStatus));
|
||||
return NoReply.Instance;
|
||||
}
|
||||
|
||||
|
@ -187,19 +187,19 @@ public sealed class WebMessageListener : IMessageToControllerListener {
|
||||
}
|
||||
|
||||
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message) {
|
||||
return agentManager.CreateOrUpdateInstance(message.LoggedInUserGuid, message.Configuration);
|
||||
return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.Configuration));
|
||||
}
|
||||
|
||||
public Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
|
||||
return agentManager.AskInstance<InstanceActor.LaunchInstanceCommand, LaunchInstanceResult>(message.AgentGuid, message.InstanceGuid, new InstanceActor.LaunchInstanceCommand(message.LoggedInUserGuid));
|
||||
return agentManager.DoInstanceAction<AgentActor.LaunchInstanceCommand, LaunchInstanceResult>(message.AgentGuid, new AgentActor.LaunchInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid));
|
||||
}
|
||||
|
||||
public Task<InstanceActionResult<StopInstanceResult>> HandleStopInstance(StopInstanceMessage message) {
|
||||
return agentManager.AskInstance<InstanceActor.StopInstanceCommand, StopInstanceResult>(message.AgentGuid, message.InstanceGuid, new InstanceActor.StopInstanceCommand(message.LoggedInUserGuid, message.StopStrategy));
|
||||
return agentManager.DoInstanceAction<AgentActor.StopInstanceCommand, StopInstanceResult>(message.AgentGuid, new AgentActor.StopInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.StopStrategy));
|
||||
}
|
||||
|
||||
public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
|
||||
return agentManager.AskInstance<InstanceActor.SendMinecraftCommandCommand, SendCommandToInstanceResult>(message.AgentGuid, message.InstanceGuid, new InstanceActor.SendMinecraftCommandCommand(message.LoggedInUserGuid, message.Command));
|
||||
return agentManager.DoInstanceAction<AgentActor.SendMinecraftCommandCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendMinecraftCommandCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
|
||||
}
|
||||
|
||||
public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {
|
||||
|
@ -9,6 +9,10 @@ public readonly struct ActorRef<TMessage> {
|
||||
this.actorRef = actorRef;
|
||||
}
|
||||
|
||||
internal bool IsSame<TOtherMessage>(ActorRef<TOtherMessage> other) {
|
||||
return actorRef.Equals(other.actorRef);
|
||||
}
|
||||
|
||||
public void Tell(TMessage message) {
|
||||
actorRef.Tell(message);
|
||||
}
|
||||
@ -17,14 +21,6 @@ public readonly struct ActorRef<TMessage> {
|
||||
actorRef.Forward(message);
|
||||
}
|
||||
|
||||
public Task<TReply> Ask<TReply>(TMessage message, TimeSpan? timeout, CancellationToken cancellationToken = default) {
|
||||
return actorRef.Ask<TReply>(message, timeout, cancellationToken);
|
||||
}
|
||||
|
||||
public Task<TReply> Ask<TReply>(TMessage message, CancellationToken cancellationToken) {
|
||||
return Ask<TReply>(message, timeout: null, cancellationToken);
|
||||
}
|
||||
|
||||
public Task<TReply> Request<TReply>(ICanReply<TReply> message, TimeSpan? timeout, CancellationToken cancellationToken = default) {
|
||||
return actorRef.Ask<TReply>(message, timeout, cancellationToken);
|
||||
}
|
||||
@ -32,8 +28,4 @@ public readonly struct ActorRef<TMessage> {
|
||||
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
|
||||
return Request(message, timeout: null, cancellationToken);
|
||||
}
|
||||
|
||||
internal bool IsSame<TOtherMessage>(ActorRef<TOtherMessage> other) {
|
||||
return actorRef.Equals(other.actorRef);
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,10 @@ public abstract class ReceiveActor<TMessage> : ReceiveActor {
|
||||
ReceiveAsync<TReplyableCommand>(message => HandleMessageWithReplyAsync(action, message));
|
||||
}
|
||||
|
||||
protected void ReceiveAsyncAndReplyLater<TReplyableCommand, TReply>(Func<TReplyableCommand, TaskCompletionSource<TReply>, Task> action) where TReplyableCommand : TMessage, ICanReply<TReply> {
|
||||
ReceiveAsync<TReplyableCommand>(message => HandleMessageWithReplyAsync(action, message));
|
||||
}
|
||||
|
||||
private void HandleMessageWithReply<TReplyableCommand, TReply>(Func<TReplyableCommand, TReply> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
|
||||
try {
|
||||
Sender.Tell(action(message), Self);
|
||||
@ -28,4 +32,20 @@ public abstract class ReceiveActor<TMessage> : ReceiveActor {
|
||||
Sender.Tell(new Status.Failure(e), Self);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleMessageWithReplyAsync<TReplyableCommand, TReply>(Func<TReplyableCommand, TaskCompletionSource<TReply>, Task> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
|
||||
var taskCompletionSource = new TaskCompletionSource<TReply>();
|
||||
var sender = Sender;
|
||||
|
||||
taskCompletionSource.Task.ContinueWith(task => ((ICanTell) sender).Tell())
|
||||
try {
|
||||
Sender.Tell(await action(message), Self);
|
||||
} catch (Exception e) {
|
||||
Sender.Tell(new Status.Failure(e), Self);
|
||||
}
|
||||
}
|
||||
|
||||
protected void CompleteWith<TReply>(TaskCompletionSource<TReply> taskCompletionSource, Func<Task<TReply>> task) {
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,6 @@ public sealed class AgentManager {
|
||||
}
|
||||
|
||||
public ImmutableDictionary<Guid, Agent> ToDictionaryByGuid() {
|
||||
return agents.Value.ToImmutableDictionary(static agent => agent.Configuration.Guid);
|
||||
return agents.Value.ToImmutableDictionary(static agent => agent.Configuration.AgentGuid);
|
||||
}
|
||||
}
|
||||
|
@ -23,8 +23,8 @@
|
||||
var usedMemory = agent.Stats?.RunningInstanceMemory.InMegabytes;
|
||||
}
|
||||
<Cell>
|
||||
<p class="fw-semibold">@configuration.Name</p>
|
||||
<small class="font-monospace text-uppercase">@configuration.Guid.ToString()</small>
|
||||
<p class="fw-semibold">@configuration.AgentName</p>
|
||||
<small class="font-monospace text-uppercase">@configuration.AgentGuid.ToString()</small>
|
||||
</Cell>
|
||||
<Cell class="text-end">
|
||||
<ProgressBar Value="@(usedInstances ?? 0)" Maximum="@configuration.MaxInstances">
|
||||
@ -72,8 +72,8 @@
|
||||
|
||||
protected override void OnInitialized() {
|
||||
AgentManager.AgentsChanged.Subscribe(this, agents => {
|
||||
var sortedAgents = agents.Sort(static (a1, a2) => a1.Configuration.Name.CompareTo(a2.Configuration.Name));
|
||||
agentTable.UpdateFrom(sortedAgents, static agent => agent.Configuration.Guid, static agent => agent, static (agent, _) => agent);
|
||||
var sortedAgents = agents.Sort(static (a1, a2) => a1.Configuration.AgentName.CompareTo(a2.Configuration.AgentName));
|
||||
agentTable.UpdateFrom(sortedAgents, static agent => agent.Configuration.AgentGuid, static agent => agent, static (agent, _) => agent);
|
||||
InvokeAsync(StateHasChanged);
|
||||
});
|
||||
}
|
||||
|
@ -61,7 +61,7 @@
|
||||
|
||||
try {
|
||||
logItems = await EventLogManager.GetMostRecentItems(50, cancellationToken);
|
||||
agentNamesByGuid = AgentManager.GetAll().Select(static agent => agent.Configuration).ToImmutableDictionary(static kvp => kvp.Guid, static kvp => kvp.Name);
|
||||
agentNamesByGuid = AgentManager.GetAll().Select(static agent => agent.Configuration).ToImmutableDictionary(static kvp => kvp.AgentGuid, static kvp => kvp.AgentName);
|
||||
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid, static instance => instance.Configuration.InstanceName);
|
||||
} finally {
|
||||
initializationCancellationTokenSource.Dispose();
|
||||
|
@ -66,7 +66,7 @@
|
||||
|
||||
protected override void OnInitialized() {
|
||||
AgentManager.AgentsChanged.Subscribe(this, agents => {
|
||||
this.agentNamesByGuid = agents.Select(static agent => agent.Configuration).ToImmutableDictionary(static agent => agent.Guid, static agent => agent.Name);
|
||||
this.agentNamesByGuid = agents.Select(static agent => agent.Configuration).ToImmutableDictionary(static agent => agent.AgentGuid, static agent => agent.AgentName);
|
||||
InvokeAsync(StateHasChanged);
|
||||
});
|
||||
|
||||
|
@ -28,8 +28,8 @@
|
||||
@{
|
||||
static RenderFragment GetAgentOption(Agent agent) {
|
||||
var configuration = agent.Configuration;
|
||||
return @<option value="@configuration.Guid">
|
||||
@configuration.Name
|
||||
return @<option value="@configuration.AgentGuid">
|
||||
@configuration.AgentName
|
||||
•
|
||||
@(agent.Stats?.RunningInstanceCount.ToString() ?? "?")/@(configuration.MaxInstances) @(configuration.MaxInstances == 1 ? "Instance" : "Instances")
|
||||
•
|
||||
@ -40,7 +40,7 @@
|
||||
@if (EditedInstanceConfiguration == null) {
|
||||
<FormSelectInput Id="instance-agent" Label="Agent" @bind-Value="form.SelectedAgentGuid">
|
||||
<option value="" selected>Select which agent will run the instance...</option>
|
||||
@foreach (var agent in allAgentsByGuid.Values.Where(static agent => agent.ConnectionStatus is AgentIsOnline).OrderBy(static agent => agent.Configuration.Name)) {
|
||||
@foreach (var agent in allAgentsByGuid.Values.Where(static agent => agent.ConnectionStatus is AgentIsOnline).OrderBy(static agent => agent.Configuration.AgentName)) {
|
||||
@GetAgentOption(agent)
|
||||
}
|
||||
</FormSelectInput>
|
||||
@ -329,7 +329,7 @@
|
||||
}
|
||||
|
||||
var instance = new InstanceConfiguration(
|
||||
EditedInstanceConfiguration?.AgentGuid ?? selectedAgent.Configuration.Guid,
|
||||
EditedInstanceConfiguration?.AgentGuid ?? selectedAgent.Configuration.AgentGuid,
|
||||
EditedInstanceConfiguration?.InstanceGuid ?? Guid.NewGuid(),
|
||||
form.InstanceName,
|
||||
(ushort) form.ServerPort,
|
||||
|
Loading…
Reference in New Issue
Block a user