mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2024-11-25 07:42:58 +01:00
Compare commits
2 Commits
dcfe66a337
...
ebc2db9c49
Author | SHA1 | Date | |
---|---|---|---|
ebc2db9c49 | |||
71acce3123 |
@ -17,7 +17,6 @@ sealed class Instance : IAsyncDisposable {
|
||||
private IServerLauncher Launcher { get; set; }
|
||||
private readonly SemaphoreSlim configurationSemaphore = new (1, 1);
|
||||
|
||||
private readonly Guid instanceGuid;
|
||||
private readonly string shortName;
|
||||
private readonly ILogger logger;
|
||||
|
||||
@ -30,8 +29,7 @@ sealed class Instance : IAsyncDisposable {
|
||||
|
||||
private readonly InstanceProcedureManager procedureManager;
|
||||
|
||||
public Instance(Guid instanceGuid, string shortName, InstanceServices services, InstanceConfiguration configuration, IServerLauncher launcher) {
|
||||
this.instanceGuid = instanceGuid;
|
||||
public Instance(string shortName, InstanceServices services, InstanceConfiguration configuration, IServerLauncher launcher) {
|
||||
this.shortName = shortName;
|
||||
this.logger = PhantomLogger.Create<Instance>(shortName);
|
||||
|
||||
@ -46,16 +44,16 @@ sealed class Instance : IAsyncDisposable {
|
||||
}
|
||||
|
||||
public void ReportLastStatus() {
|
||||
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(instanceGuid, currentStatus));
|
||||
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
|
||||
}
|
||||
|
||||
private void ReportAndSetStatus(IInstanceStatus status) {
|
||||
currentStatus = status;
|
||||
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(instanceGuid, status));
|
||||
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
|
||||
}
|
||||
|
||||
private void ReportEvent(IInstanceEvent instanceEvent) {
|
||||
Services.ControllerConnection.Send(new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, instanceGuid, instanceEvent));
|
||||
Services.ControllerConnection.Send(new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent));
|
||||
}
|
||||
|
||||
internal void TransitionState(IInstanceState newState) {
|
||||
@ -101,7 +99,7 @@ sealed class Instance : IAsyncDisposable {
|
||||
|
||||
await configurationSemaphore.WaitAsync(cancellationToken);
|
||||
try {
|
||||
procedure = new LaunchInstanceProcedure(instanceGuid, Configuration, Launcher);
|
||||
procedure = new LaunchInstanceProcedure(Configuration, Launcher);
|
||||
} finally {
|
||||
configurationSemaphore.Release();
|
||||
}
|
||||
|
@ -75,8 +75,9 @@ sealed class InstanceSessionManager : IAsyncDisposable {
|
||||
});
|
||||
}
|
||||
|
||||
public async Task<InstanceActionResult<ConfigureInstanceResult>> Configure(Guid instanceGuid, InstanceConfiguration configuration, InstanceLaunchProperties launchProperties, bool launchNow, bool alwaysReportStatus) {
|
||||
public async Task<InstanceActionResult<ConfigureInstanceResult>> Configure(InstanceConfiguration configuration, InstanceLaunchProperties launchProperties, bool launchNow, bool alwaysReportStatus) {
|
||||
return await AcquireSemaphoreAndRun(async () => {
|
||||
var instanceGuid = configuration.InstanceGuid;
|
||||
var instanceFolder = Path.Combine(basePath, instanceGuid.ToString());
|
||||
Directories.Create(instanceFolder, Chmod.URWX_GRX);
|
||||
|
||||
@ -105,15 +106,15 @@ sealed class InstanceSessionManager : IAsyncDisposable {
|
||||
|
||||
if (instances.TryGetValue(instanceGuid, out var instance)) {
|
||||
await instance.Reconfigure(configuration, launcher, shutdownCancellationToken);
|
||||
Logger.Information("Reconfigured instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, instanceGuid);
|
||||
Logger.Information("Reconfigured instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, configuration.InstanceGuid);
|
||||
|
||||
if (alwaysReportStatus) {
|
||||
instance.ReportLastStatus();
|
||||
}
|
||||
}
|
||||
else {
|
||||
instances[instanceGuid] = instance = new Instance(instanceGuid, GetInstanceLoggerName(instanceGuid), instanceServices, configuration, launcher);
|
||||
Logger.Information("Created instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, instanceGuid);
|
||||
instances[instanceGuid] = instance = new Instance(GetInstanceLoggerName(instanceGuid), instanceServices, configuration, launcher);
|
||||
Logger.Information("Created instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, configuration.InstanceGuid);
|
||||
|
||||
instance.ReportLastStatus();
|
||||
instance.IsRunningChanged += OnInstanceIsRunningChanged;
|
||||
|
@ -6,7 +6,7 @@ using Phantom.Common.Data.Instance;
|
||||
|
||||
namespace Phantom.Agent.Services.Instances.Procedures;
|
||||
|
||||
sealed record LaunchInstanceProcedure(Guid InstanceGuid, InstanceConfiguration Configuration, IServerLauncher Launcher, bool IsRestarting = false) : IInstanceProcedure {
|
||||
sealed record LaunchInstanceProcedure(InstanceConfiguration Configuration, IServerLauncher Launcher, bool IsRestarting = false) : IInstanceProcedure {
|
||||
public async Task<IInstanceState?> Run(IInstanceContext context, CancellationToken cancellationToken) {
|
||||
if (!IsRestarting && context.CurrentState is InstanceRunningState) {
|
||||
return null;
|
||||
@ -30,7 +30,7 @@ sealed record LaunchInstanceProcedure(Guid InstanceGuid, InstanceConfiguration C
|
||||
context.Logger.Information("Session starting...");
|
||||
try {
|
||||
InstanceProcess process = await DoLaunch(context, cancellationToken);
|
||||
return new InstanceRunningState(InstanceGuid, Configuration, Launcher, process, context);
|
||||
return new InstanceRunningState(Configuration, Launcher, process, context);
|
||||
} catch (OperationCanceledException) {
|
||||
context.SetStatus(InstanceStatus.NotRunning);
|
||||
} catch (LaunchFailureException e) {
|
||||
|
@ -12,7 +12,6 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
|
||||
|
||||
internal bool IsStopping { get; set; }
|
||||
|
||||
private readonly Guid instanceGuid;
|
||||
private readonly InstanceConfiguration configuration;
|
||||
private readonly IServerLauncher launcher;
|
||||
private readonly IInstanceContext context;
|
||||
@ -22,14 +21,13 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
|
||||
|
||||
private bool isDisposed;
|
||||
|
||||
public InstanceRunningState(Guid instanceGuid, InstanceConfiguration configuration, IServerLauncher launcher, InstanceProcess process, IInstanceContext context) {
|
||||
this.instanceGuid = instanceGuid;
|
||||
public InstanceRunningState(InstanceConfiguration configuration, IServerLauncher launcher, InstanceProcess process, IInstanceContext context) {
|
||||
this.configuration = configuration;
|
||||
this.launcher = launcher;
|
||||
this.context = context;
|
||||
this.Process = process;
|
||||
|
||||
this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.Services.TaskManager, instanceGuid, context.ShortName);
|
||||
this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.Services.TaskManager, configuration.InstanceGuid, context.ShortName);
|
||||
|
||||
this.backupScheduler = new BackupScheduler(context.Services.TaskManager, context.Services.BackupManager, process, context, configuration.ServerPort);
|
||||
this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted;
|
||||
@ -66,7 +64,7 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
|
||||
else {
|
||||
context.Logger.Information("Session ended unexpectedly, restarting...");
|
||||
context.ReportEvent(InstanceEvent.Crashed);
|
||||
context.EnqueueProcedure(new LaunchInstanceProcedure(instanceGuid, configuration, launcher, IsRestarting: true));
|
||||
context.EnqueueProcedure(new LaunchInstanceProcedure(configuration, launcher, IsRestarting: true));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,15 +27,15 @@ public sealed class MessageListener : IMessageToAgentListener {
|
||||
public async Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
|
||||
Logger.Information("Agent authentication successful.");
|
||||
|
||||
void ShutdownAfterConfigurationFailed(Guid instanceGuid, InstanceConfiguration configuration) {
|
||||
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", configuration.InstanceName, instanceGuid);
|
||||
void ShutdownAfterConfigurationFailed(InstanceConfiguration configuration) {
|
||||
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", configuration.InstanceName, configuration.InstanceGuid);
|
||||
shutdownTokenSource.Cancel();
|
||||
}
|
||||
|
||||
foreach (var configureInstanceMessage in message.InitialInstanceConfigurations) {
|
||||
var result = await HandleConfigureInstance(configureInstanceMessage, alwaysReportStatus: true);
|
||||
if (!result.Is(ConfigureInstanceResult.Success)) {
|
||||
ShutdownAfterConfigurationFailed(configureInstanceMessage.InstanceGuid, configureInstanceMessage.Configuration);
|
||||
ShutdownAfterConfigurationFailed(configureInstanceMessage.Configuration);
|
||||
return NoReply.Instance;
|
||||
}
|
||||
}
|
||||
@ -64,7 +64,7 @@ public sealed class MessageListener : IMessageToAgentListener {
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message, bool alwaysReportStatus) {
|
||||
return agent.InstanceSessionManager.Configure(message.InstanceGuid, message.Configuration, message.LaunchProperties, message.LaunchNow, alwaysReportStatus);
|
||||
return agent.InstanceSessionManager.Configure(message.Configuration, message.LaunchProperties, message.LaunchNow, alwaysReportStatus);
|
||||
}
|
||||
|
||||
public async Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message) {
|
||||
|
@ -5,10 +5,9 @@ namespace Phantom.Common.Data.Web.Agent;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record Agent(
|
||||
[property: MemoryPackOrder(0)] Guid AgentGuid,
|
||||
[property: MemoryPackOrder(1)] AgentConfiguration Configuration,
|
||||
[property: MemoryPackOrder(2)] AgentStats? Stats,
|
||||
[property: MemoryPackOrder(3)] IAgentConnectionStatus ConnectionStatus
|
||||
[property: MemoryPackOrder(0)] AgentConfiguration Configuration,
|
||||
[property: MemoryPackOrder(1)] AgentStats? Stats,
|
||||
[property: MemoryPackOrder(2)] IAgentConnectionStatus ConnectionStatus
|
||||
) {
|
||||
[MemoryPackIgnore]
|
||||
public RamAllocationUnits? AvailableMemory => Configuration.MaxMemory - Stats?.RunningInstanceMemory;
|
||||
|
@ -5,15 +5,16 @@ namespace Phantom.Common.Data.Web.Agent;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record AgentConfiguration(
|
||||
[property: MemoryPackOrder(0)] string AgentName,
|
||||
[property: MemoryPackOrder(1)] ushort ProtocolVersion,
|
||||
[property: MemoryPackOrder(2)] string BuildVersion,
|
||||
[property: MemoryPackOrder(3)] ushort MaxInstances,
|
||||
[property: MemoryPackOrder(4)] RamAllocationUnits MaxMemory,
|
||||
[property: MemoryPackOrder(5)] AllowedPorts? AllowedServerPorts = null,
|
||||
[property: MemoryPackOrder(6)] AllowedPorts? AllowedRconPorts = null
|
||||
[property: MemoryPackOrder(0)] Guid AgentGuid,
|
||||
[property: MemoryPackOrder(1)] string AgentName,
|
||||
[property: MemoryPackOrder(2)] ushort ProtocolVersion,
|
||||
[property: MemoryPackOrder(3)] string BuildVersion,
|
||||
[property: MemoryPackOrder(4)] ushort MaxInstances,
|
||||
[property: MemoryPackOrder(5)] RamAllocationUnits MaxMemory,
|
||||
[property: MemoryPackOrder(6)] AllowedPorts? AllowedServerPorts = null,
|
||||
[property: MemoryPackOrder(7)] AllowedPorts? AllowedRconPorts = null
|
||||
) {
|
||||
public static AgentConfiguration From(AgentInfo agentInfo) {
|
||||
return new AgentConfiguration(agentInfo.AgentName, agentInfo.ProtocolVersion, agentInfo.BuildVersion, agentInfo.MaxInstances, agentInfo.MaxMemory, agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
|
||||
return new AgentConfiguration(agentInfo.AgentGuid, agentInfo.AgentName, agentInfo.ProtocolVersion, agentInfo.BuildVersion, agentInfo.MaxInstances, agentInfo.MaxMemory, agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
|
||||
}
|
||||
}
|
||||
|
@ -5,12 +5,11 @@ namespace Phantom.Common.Data.Web.Instance;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record Instance(
|
||||
[property: MemoryPackOrder(0)] Guid InstanceGuid,
|
||||
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration,
|
||||
[property: MemoryPackOrder(2)] IInstanceStatus Status,
|
||||
[property: MemoryPackOrder(3)] bool LaunchAutomatically
|
||||
[property: MemoryPackOrder(0)] InstanceConfiguration Configuration,
|
||||
[property: MemoryPackOrder(1)] IInstanceStatus Status,
|
||||
[property: MemoryPackOrder(2)] bool LaunchAutomatically
|
||||
) {
|
||||
public static Instance Offline(Guid instanceGuid, InstanceConfiguration configuration, bool launchAutomatically = false) {
|
||||
return new Instance(instanceGuid, configuration, InstanceStatus.Offline, launchAutomatically);
|
||||
public static Instance Offline(InstanceConfiguration configuration, bool launchAutomatically = false) {
|
||||
return new Instance(configuration, InstanceStatus.Offline, launchAutomatically);
|
||||
}
|
||||
}
|
||||
|
@ -7,12 +7,13 @@ namespace Phantom.Common.Data.Instance;
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record InstanceConfiguration(
|
||||
[property: MemoryPackOrder(0)] Guid AgentGuid,
|
||||
[property: MemoryPackOrder(1)] string InstanceName,
|
||||
[property: MemoryPackOrder(2)] ushort ServerPort,
|
||||
[property: MemoryPackOrder(3)] ushort RconPort,
|
||||
[property: MemoryPackOrder(4)] string MinecraftVersion,
|
||||
[property: MemoryPackOrder(5)] MinecraftServerKind MinecraftServerKind,
|
||||
[property: MemoryPackOrder(6)] RamAllocationUnits MemoryAllocation,
|
||||
[property: MemoryPackOrder(7)] Guid JavaRuntimeGuid,
|
||||
[property: MemoryPackOrder(8)] ImmutableArray<string> JvmArguments
|
||||
[property: MemoryPackOrder(1)] Guid InstanceGuid,
|
||||
[property: MemoryPackOrder(2)] string InstanceName,
|
||||
[property: MemoryPackOrder(3)] ushort ServerPort,
|
||||
[property: MemoryPackOrder(4)] ushort RconPort,
|
||||
[property: MemoryPackOrder(5)] string MinecraftVersion,
|
||||
[property: MemoryPackOrder(6)] MinecraftServerKind MinecraftServerKind,
|
||||
[property: MemoryPackOrder(7)] RamAllocationUnits MemoryAllocation,
|
||||
[property: MemoryPackOrder(8)] Guid JavaRuntimeGuid,
|
||||
[property: MemoryPackOrder(9)] ImmutableArray<string> JvmArguments
|
||||
);
|
||||
|
@ -6,10 +6,9 @@ namespace Phantom.Common.Messages.Agent.ToAgent;
|
||||
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record ConfigureInstanceMessage(
|
||||
[property: MemoryPackOrder(0)] Guid InstanceGuid,
|
||||
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration,
|
||||
[property: MemoryPackOrder(2)] InstanceLaunchProperties LaunchProperties,
|
||||
[property: MemoryPackOrder(3)] bool LaunchNow = false
|
||||
[property: MemoryPackOrder(0)] InstanceConfiguration Configuration,
|
||||
[property: MemoryPackOrder(1)] InstanceLaunchProperties LaunchProperties,
|
||||
[property: MemoryPackOrder(2)] bool LaunchNow = false
|
||||
) : IMessageToAgent<InstanceActionResult<ConfigureInstanceResult>> {
|
||||
public Task<InstanceActionResult<ConfigureInstanceResult>> Accept(IMessageToAgentListener listener) {
|
||||
return listener.HandleConfigureInstance(this);
|
||||
|
@ -8,8 +8,7 @@ namespace Phantom.Common.Messages.Web.ToController;
|
||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
||||
public sealed partial record CreateOrUpdateInstanceMessage(
|
||||
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
|
||||
[property: MemoryPackOrder(1)] Guid InstanceGuid,
|
||||
[property: MemoryPackOrder(2)] InstanceConfiguration Configuration
|
||||
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration
|
||||
) : IMessageToController<InstanceActionResult<CreateOrUpdateInstanceResult>> {
|
||||
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> Accept(IMessageToControllerListener listener) {
|
||||
return listener.HandleCreateOrUpdateInstance(this);
|
||||
|
@ -5,8 +5,6 @@ using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Agent;
|
||||
using Phantom.Common.Data.Instance;
|
||||
using Phantom.Common.Data.Java;
|
||||
using Phantom.Common.Data.Minecraft;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Data.Web.Agent;
|
||||
using Phantom.Common.Data.Web.Instance;
|
||||
using Phantom.Common.Data.Web.Minecraft;
|
||||
@ -14,10 +12,8 @@ using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Controller.Database;
|
||||
using Phantom.Controller.Minecraft;
|
||||
using Phantom.Controller.Services.Instances;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Actor.Mailbox;
|
||||
using Phantom.Utils.Actor.Tasks;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
using Serilog;
|
||||
@ -30,7 +26,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5);
|
||||
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
|
||||
|
||||
public readonly record struct Init(Guid AgentGuid, AgentConfiguration AgentConfiguration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
||||
public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
||||
|
||||
public static Props<ICommand> Factory(Init init) {
|
||||
return Props<ICommand>.Create(() => new AgentActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
|
||||
@ -41,8 +37,6 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
private readonly IDbContextProvider dbProvider;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
private readonly Guid agentGuid;
|
||||
|
||||
private AgentConfiguration configuration;
|
||||
private AgentStats? stats;
|
||||
private ImmutableArray<TaggedJavaRuntime> javaRuntimes = ImmutableArray<TaggedJavaRuntime>.Empty;
|
||||
@ -67,8 +61,8 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
}
|
||||
|
||||
private readonly ActorRef<AgentDatabaseStorageActor.ICommand> databaseStorageActor;
|
||||
private readonly ActorRef<AgentInstanceRouterActor.ICommand> instanceRouterActor;
|
||||
|
||||
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
|
||||
private readonly Dictionary<Guid, Instance> instanceDataByGuid = new ();
|
||||
|
||||
private AgentActor(Init init) {
|
||||
@ -77,11 +71,11 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
this.dbProvider = init.DbProvider;
|
||||
this.cancellationToken = init.CancellationToken;
|
||||
|
||||
this.agentGuid = init.AgentGuid;
|
||||
this.configuration = init.AgentConfiguration;
|
||||
this.connection = new AgentConnection(agentGuid, configuration.AgentName);
|
||||
this.configuration = init.Configuration;
|
||||
this.connection = new AgentConnection(configuration.AgentGuid, configuration.AgentName);
|
||||
|
||||
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(agentGuid, init.DbProvider, init.CancellationToken)), "DatabaseStorage");
|
||||
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(configuration.AgentGuid, dbProvider, cancellationToken)), "DatabaseStorage");
|
||||
this.instanceRouterActor = Context.ActorOf(AgentInstanceRouterActor.Factory(new AgentInstanceRouterActor.Init(SelfTyped, connection, minecraftVersions, dbProvider, cancellationToken)), "InstanceRouter");
|
||||
|
||||
NotifyAgentUpdated();
|
||||
|
||||
@ -92,16 +86,12 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
Receive<NotifyIsAliveCommand>(NotifyIsAlive);
|
||||
Receive<UpdateStatsCommand>(UpdateStats);
|
||||
Receive<UpdateJavaRuntimesCommand>(UpdateJavaRuntimes);
|
||||
ReceiveAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
|
||||
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
|
||||
ReceiveAndReplyLater<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
|
||||
ReceiveAndReplyLater<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
|
||||
ReceiveAndReplyLater<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
|
||||
Receive<RouteToInstanceCommand>(RouteToInstance);
|
||||
Receive<ReceiveInstanceDataCommand>(ReceiveInstanceData);
|
||||
}
|
||||
|
||||
private void NotifyAgentUpdated() {
|
||||
controllerState.UpdateAgent(new Agent(agentGuid, configuration, stats, ConnectionStatus));
|
||||
controllerState.UpdateAgent(new Agent(configuration, stats, ConnectionStatus));
|
||||
}
|
||||
|
||||
protected override void PreStart() {
|
||||
@ -110,56 +100,22 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
Context.System.Scheduler.ScheduleTellRepeatedly(DisconnectionRecheckInterval, DisconnectionRecheckInterval, Self, new RefreshConnectionStatusCommand(), Self);
|
||||
}
|
||||
|
||||
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
|
||||
private void CreateNewInstance(Instance instance) {
|
||||
UpdateInstanceData(instance);
|
||||
|
||||
var instanceActor = CreateInstanceActor(instance);
|
||||
instanceActorByGuid.Add(instance.InstanceGuid, instanceActor);
|
||||
return instanceActor;
|
||||
instanceRouterActor.Tell(new AgentInstanceRouterActor.InitializeInstanceCommand(instance));
|
||||
}
|
||||
|
||||
private void UpdateInstanceData(Instance instance) {
|
||||
instanceDataByGuid[instance.InstanceGuid] = instance;
|
||||
instanceDataByGuid[instance.Configuration.InstanceGuid] = instance;
|
||||
controllerState.UpdateInstance(instance);
|
||||
}
|
||||
|
||||
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
|
||||
var init = new InstanceActor.Init(instance, SelfTyped, connection, dbProvider, cancellationToken);
|
||||
var name = "Instance:" + instance.InstanceGuid;
|
||||
return Context.ActorOf(InstanceActor.Factory(init), name);
|
||||
}
|
||||
|
||||
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
|
||||
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
|
||||
instance.Tell(command);
|
||||
}
|
||||
else {
|
||||
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
|
||||
}
|
||||
}
|
||||
|
||||
private void TellAllInstances(InstanceActor.ICommand command) {
|
||||
foreach (var instance in instanceActorByGuid.Values) {
|
||||
instance.Tell(command);
|
||||
}
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<TReply>> RequestInstance<TCommand, TReply>(Guid instanceGuid, TCommand command) where TCommand : InstanceActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
|
||||
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
|
||||
return instance.Request(command, cancellationToken);
|
||||
}
|
||||
else {
|
||||
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
|
||||
return Task.FromResult(InstanceActionResult.General<TReply>(InstanceActionGeneralResult.InstanceDoesNotExist));
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<ImmutableArray<ConfigureInstanceMessage>> PrepareInitialConfigurationMessages() {
|
||||
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
|
||||
|
||||
foreach (var (instanceGuid, instanceConfiguration, _, launchAutomatically) in instanceDataByGuid.Values.ToImmutableArray()) {
|
||||
foreach (var (instanceConfiguration, _, launchAutomatically) in instanceDataByGuid.Values.ToImmutableArray()) {
|
||||
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
|
||||
configurationMessages.Add(new ConfigureInstanceMessage(instanceGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
|
||||
configurationMessages.Add(new ConfigureInstanceMessage(instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
|
||||
}
|
||||
|
||||
return configurationMessages.ToImmutable();
|
||||
@ -181,23 +137,16 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
|
||||
public sealed record UpdateJavaRuntimesCommand(ImmutableArray<TaggedJavaRuntime> JavaRuntimes) : ICommand;
|
||||
|
||||
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, Guid InstanceGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
|
||||
|
||||
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
|
||||
|
||||
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
|
||||
|
||||
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
|
||||
|
||||
public sealed record SendCommandToInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
|
||||
public sealed record RouteToInstanceCommand(AgentInstanceRouterActor.ICommand Command) : ICommand;
|
||||
|
||||
public sealed record ReceiveInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead;
|
||||
|
||||
private async Task Initialize(InitializeCommand command) {
|
||||
await using var ctx = dbProvider.Eager();
|
||||
await foreach (var entity in ctx.Instances.Where(instance => instance.AgentGuid == agentGuid).AsAsyncEnumerable().WithCancellation(cancellationToken)) {
|
||||
await foreach (var entity in ctx.Instances.Where(instance => instance.AgentGuid == configuration.AgentGuid).AsAsyncEnumerable().WithCancellation(cancellationToken)) {
|
||||
var instanceConfiguration = new InstanceConfiguration(
|
||||
entity.AgentGuid,
|
||||
entity.InstanceGuid,
|
||||
entity.InstanceName,
|
||||
entity.ServerPort,
|
||||
entity.RconPort,
|
||||
@ -208,7 +157,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
JvmArgumentsHelper.Split(entity.JvmArguments)
|
||||
);
|
||||
|
||||
CreateNewInstance(Instance.Offline(entity.InstanceGuid, instanceConfiguration, entity.LaunchAutomatically));
|
||||
CreateNewInstance(Instance.Offline(instanceConfiguration, entity.LaunchAutomatically));
|
||||
}
|
||||
}
|
||||
|
||||
@ -222,9 +171,9 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
isOnline = true;
|
||||
NotifyAgentUpdated();
|
||||
|
||||
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, agentGuid);
|
||||
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
|
||||
|
||||
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentConfigurationCommand(configuration));
|
||||
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentCommand(configuration.AgentName, configuration.ProtocolVersion, configuration.BuildVersion, configuration.MaxInstances, configuration.MaxMemory));
|
||||
|
||||
return configurationMessages;
|
||||
}
|
||||
@ -236,9 +185,9 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
isOnline = false;
|
||||
NotifyAgentUpdated();
|
||||
|
||||
TellAllInstances(new InstanceActor.SetStatusCommand(InstanceStatus.Offline));
|
||||
instanceRouterActor.Tell(new AgentInstanceRouterActor.MarkInstancesAsOfflineCommand());
|
||||
|
||||
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, agentGuid);
|
||||
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,7 +196,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
isOnline = false;
|
||||
NotifyAgentUpdated();
|
||||
|
||||
Logger.Warning("Lost connection to agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, agentGuid);
|
||||
Logger.Warning("Lost connection to agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
|
||||
}
|
||||
}
|
||||
|
||||
@ -267,79 +216,11 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
||||
|
||||
private void UpdateJavaRuntimes(UpdateJavaRuntimesCommand command) {
|
||||
javaRuntimes = command.JavaRuntimes;
|
||||
controllerState.UpdateAgentJavaRuntimes(agentGuid, javaRuntimes);
|
||||
controllerState.UpdateAgentJavaRuntimes(configuration.AgentGuid, javaRuntimes);
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command) {
|
||||
var instanceConfiguration = command.Configuration;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(instanceConfiguration.InstanceName)) {
|
||||
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
|
||||
}
|
||||
|
||||
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
|
||||
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
|
||||
}
|
||||
|
||||
return minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken)
|
||||
.ContinueOnActor(CreateOrUpdateInstance1, command)
|
||||
.Unwrap();
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance1(FileDownloadInfo? serverExecutableInfo, CreateOrUpdateInstanceCommand command) {
|
||||
if (serverExecutableInfo == null) {
|
||||
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
|
||||
}
|
||||
|
||||
var instanceConfiguration = command.Configuration;
|
||||
|
||||
bool isCreatingInstance = !instanceActorByGuid.TryGetValue(command.InstanceGuid, out var instanceActorRef);
|
||||
if (isCreatingInstance) {
|
||||
instanceActorRef = CreateNewInstance(Instance.Offline(command.InstanceGuid, instanceConfiguration));
|
||||
}
|
||||
|
||||
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, command.InstanceGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isCreatingInstance);
|
||||
|
||||
return instanceActorRef.Request(configureInstanceCommand, cancellationToken)
|
||||
.ContinueOnActor(CreateOrUpdateInstance2, configureInstanceCommand);
|
||||
}
|
||||
|
||||
private InstanceActionResult<CreateOrUpdateInstanceResult> CreateOrUpdateInstance2(InstanceActionResult<ConfigureInstanceResult> result, InstanceActor.ConfigureInstanceCommand command) {
|
||||
var instanceGuid = command.InstanceGuid;
|
||||
var instanceName = command.Configuration.InstanceName;
|
||||
var isCreating = command.IsCreatingInstance;
|
||||
|
||||
if (result.Is(ConfigureInstanceResult.Success)) {
|
||||
string action = isCreating ? "Added" : "Edited";
|
||||
string relation = isCreating ? "to agent" : "in agent";
|
||||
Logger.Information(action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\".", instanceName, instanceGuid, configuration.AgentName);
|
||||
}
|
||||
else {
|
||||
string action = isCreating ? "adding" : "editing";
|
||||
string relation = isCreating ? "to agent" : "in agent";
|
||||
Logger.Information("Failed " + action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, configuration.AgentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
|
||||
}
|
||||
|
||||
return result.Map(static result => result switch {
|
||||
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
|
||||
_ => CreateOrUpdateInstanceResult.UnknownError
|
||||
});
|
||||
}
|
||||
|
||||
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
|
||||
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
|
||||
return RequestInstance<InstanceActor.LaunchInstanceCommand, LaunchInstanceResult>(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
|
||||
return RequestInstance<InstanceActor.StopInstanceCommand, StopInstanceResult>(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
|
||||
return RequestInstance<InstanceActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(command.InstanceGuid, new InstanceActor.SendCommandToInstanceCommand(command.AuditLogUserGuid, command.Command));
|
||||
private void RouteToInstance(RouteToInstanceCommand command) {
|
||||
instanceRouterActor.Forward(command.Command);
|
||||
}
|
||||
|
||||
private void ReceiveInstanceData(ReceiveInstanceDataCommand command) {
|
||||
|
@ -1,4 +1,4 @@
|
||||
using Phantom.Common.Data.Web.Agent;
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Controller.Database;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
@ -19,7 +19,7 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
|
||||
private readonly IDbContextProvider dbProvider;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
private AgentConfiguration? configurationToStore;
|
||||
private StoreAgentCommand? lastStoreCommand;
|
||||
private bool hasScheduledFlush;
|
||||
|
||||
private AgentDatabaseStorageActor(Init init) {
|
||||
@ -27,25 +27,25 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
|
||||
this.dbProvider = init.DbProvider;
|
||||
this.cancellationToken = init.CancellationToken;
|
||||
|
||||
Receive<StoreAgentConfigurationCommand>(StoreAgentConfiguration);
|
||||
Receive<StoreAgentCommand>(StoreAgent);
|
||||
ReceiveAsync<FlushChangesCommand>(FlushChanges);
|
||||
}
|
||||
|
||||
public interface ICommand {}
|
||||
|
||||
public sealed record StoreAgentConfigurationCommand(AgentConfiguration Configuration) : ICommand;
|
||||
public sealed record StoreAgentCommand(string Name, ushort ProtocolVersion, string BuildVersion, ushort MaxInstances, RamAllocationUnits MaxMemory) : ICommand;
|
||||
|
||||
private sealed record FlushChangesCommand : ICommand;
|
||||
|
||||
private void StoreAgentConfiguration(StoreAgentConfigurationCommand command) {
|
||||
this.configurationToStore = command.Configuration;
|
||||
private void StoreAgent(StoreAgentCommand command) {
|
||||
this.lastStoreCommand = command;
|
||||
ScheduleFlush(TimeSpan.FromSeconds(2));
|
||||
}
|
||||
|
||||
private async Task FlushChanges(FlushChangesCommand command) {
|
||||
hasScheduledFlush = false;
|
||||
|
||||
if (configurationToStore == null) {
|
||||
if (lastStoreCommand == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -53,22 +53,22 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
|
||||
await using var ctx = dbProvider.Eager();
|
||||
var entity = ctx.AgentUpsert.Fetch(agentGuid);
|
||||
|
||||
entity.Name = configurationToStore.AgentName;
|
||||
entity.ProtocolVersion = configurationToStore.ProtocolVersion;
|
||||
entity.BuildVersion = configurationToStore.BuildVersion;
|
||||
entity.MaxInstances = configurationToStore.MaxInstances;
|
||||
entity.MaxMemory = configurationToStore.MaxMemory;
|
||||
entity.Name = lastStoreCommand.Name;
|
||||
entity.ProtocolVersion = lastStoreCommand.ProtocolVersion;
|
||||
entity.BuildVersion = lastStoreCommand.BuildVersion;
|
||||
entity.MaxInstances = lastStoreCommand.MaxInstances;
|
||||
entity.MaxMemory = lastStoreCommand.MaxMemory;
|
||||
|
||||
await ctx.SaveChangesAsync(cancellationToken);
|
||||
} catch (Exception e) {
|
||||
ScheduleFlush(TimeSpan.FromSeconds(10));
|
||||
Logger.Error(e, "Could not store agent \"{AgentName}\" (GUID {AgentGuid}) in database.", configurationToStore.AgentName, agentGuid);
|
||||
Logger.Error(e, "Could not store agent \"{AgentName}\" (GUID {AgentGuid}) to database.", lastStoreCommand.Name, agentGuid);
|
||||
return;
|
||||
}
|
||||
|
||||
Logger.Information("Stored agent \"{AgentName}\" (GUID {AgentGuid}) in database.", configurationToStore.AgentName, agentGuid);
|
||||
Logger.Information("Stored agent \"{AgentName}\" (GUID {AgentGuid}) to database.", lastStoreCommand.Name, agentGuid);
|
||||
|
||||
configurationToStore = null;
|
||||
lastStoreCommand = null;
|
||||
}
|
||||
|
||||
private void ScheduleFlush(TimeSpan delay) {
|
||||
|
@ -0,0 +1,188 @@
|
||||
using Phantom.Common.Data;
|
||||
using Phantom.Common.Data.Instance;
|
||||
using Phantom.Common.Data.Minecraft;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Data.Web.Instance;
|
||||
using Phantom.Controller.Database;
|
||||
using Phantom.Controller.Minecraft;
|
||||
using Phantom.Controller.Services.Instances;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Actor.Mailbox;
|
||||
using Phantom.Utils.Actor.Tasks;
|
||||
using Phantom.Utils.Logging;
|
||||
using Serilog;
|
||||
|
||||
namespace Phantom.Controller.Services.Agents;
|
||||
|
||||
sealed class AgentInstanceRouterActor : ReceiveActor<AgentInstanceRouterActor.ICommand> {
|
||||
private static readonly ILogger Logger = PhantomLogger.Create<AgentInstanceRouterActor>();
|
||||
|
||||
public readonly record struct Init(ActorRef<AgentActor.ICommand> AgentActorRef, AgentConnection AgentConnection, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
||||
|
||||
public static Props<ICommand> Factory(Init init) {
|
||||
return Props<ICommand>.Create(() => new AgentInstanceRouterActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
|
||||
}
|
||||
|
||||
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
|
||||
private readonly AgentConnection agentConnection;
|
||||
private readonly MinecraftVersions minecraftVersions;
|
||||
private readonly IDbContextProvider dbProvider;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
|
||||
|
||||
private AgentInstanceRouterActor(Init init) {
|
||||
this.agentActorRef = init.AgentActorRef;
|
||||
this.agentConnection = init.AgentConnection;
|
||||
this.minecraftVersions = init.MinecraftVersions;
|
||||
this.dbProvider = init.DbProvider;
|
||||
this.cancellationToken = init.CancellationToken;
|
||||
|
||||
Receive<InitializeInstanceCommand>(InitializeInstance);
|
||||
Receive<MarkInstancesAsOfflineCommand>(MarkInstancesAsOffline);
|
||||
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
|
||||
ReceiveAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
|
||||
ReceiveAndReplyLater<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
|
||||
ReceiveAndReplyLater<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
|
||||
ReceiveAndReplyLater<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
|
||||
}
|
||||
|
||||
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
|
||||
var instanceGuid = instance.Configuration.InstanceGuid;
|
||||
|
||||
if (instanceActorByGuid.ContainsKey(instanceGuid)) {
|
||||
throw new InvalidOperationException("Instance already exists: " + instanceGuid);
|
||||
}
|
||||
|
||||
var instanceActor = CreateInstanceActor(instance);
|
||||
instanceActorByGuid.Add(instanceGuid, instanceActor);
|
||||
return instanceActor;
|
||||
}
|
||||
|
||||
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
|
||||
var init = new InstanceActor.Init(instance, agentActorRef, agentConnection, dbProvider, cancellationToken);
|
||||
var name = "Instance:" + instance.Configuration.InstanceGuid;
|
||||
return Context.ActorOf(InstanceActor.Factory(init), name);
|
||||
}
|
||||
|
||||
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
|
||||
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
|
||||
instance.Tell(command);
|
||||
}
|
||||
else {
|
||||
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
|
||||
}
|
||||
}
|
||||
|
||||
private void TellAllInstances(InstanceActor.ICommand command) {
|
||||
foreach (var instance in instanceActorByGuid.Values) {
|
||||
instance.Tell(command);
|
||||
}
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<TReply>> RequestInstance<TCommand, TReply>(Guid instanceGuid, TCommand command) where TCommand : InstanceActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
|
||||
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
|
||||
return instance.Request(command, cancellationToken);
|
||||
}
|
||||
else {
|
||||
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
|
||||
return Task.FromResult(InstanceActionResult.General<TReply>(InstanceActionGeneralResult.InstanceDoesNotExist));
|
||||
}
|
||||
}
|
||||
|
||||
public interface ICommand {}
|
||||
|
||||
public sealed record InitializeInstanceCommand(Instance Instance) : ICommand;
|
||||
|
||||
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
|
||||
|
||||
public sealed record MarkInstancesAsOfflineCommand : ICommand;
|
||||
|
||||
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
|
||||
|
||||
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
|
||||
|
||||
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
|
||||
|
||||
public sealed record SendCommandToInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
|
||||
|
||||
private void InitializeInstance(InitializeInstanceCommand command) {
|
||||
CreateNewInstance(command.Instance);
|
||||
}
|
||||
|
||||
private void MarkInstancesAsOffline(MarkInstancesAsOfflineCommand command) {
|
||||
TellAllInstances(new InstanceActor.SetStatusCommand(InstanceStatus.Offline));
|
||||
}
|
||||
|
||||
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
|
||||
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command) {
|
||||
var instanceConfiguration = command.Configuration;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(instanceConfiguration.InstanceName)) {
|
||||
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
|
||||
}
|
||||
|
||||
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
|
||||
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
|
||||
}
|
||||
|
||||
return minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken)
|
||||
.ContinueOnActor(CreateOrUpdateInstance1, command)
|
||||
.Unwrap();
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance1(FileDownloadInfo? serverExecutableInfo, CreateOrUpdateInstanceCommand command) {
|
||||
if (serverExecutableInfo == null) {
|
||||
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
|
||||
}
|
||||
|
||||
var instanceConfiguration = command.Configuration;
|
||||
|
||||
bool isCreatingInstance = !instanceActorByGuid.TryGetValue(instanceConfiguration.InstanceGuid, out var instanceActorRef);
|
||||
if (isCreatingInstance) {
|
||||
instanceActorRef = CreateNewInstance(Instance.Offline(instanceConfiguration));
|
||||
}
|
||||
|
||||
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isCreatingInstance);
|
||||
|
||||
return instanceActorRef.Request(configureInstanceCommand, cancellationToken)
|
||||
.ContinueOnActor(CreateOrUpdateInstance2, configureInstanceCommand);
|
||||
}
|
||||
|
||||
private InstanceActionResult<CreateOrUpdateInstanceResult> CreateOrUpdateInstance2(InstanceActionResult<ConfigureInstanceResult> result, InstanceActor.ConfigureInstanceCommand command) {
|
||||
var instanceName = command.Configuration.InstanceName;
|
||||
var instanceGuid = command.Configuration.InstanceGuid;
|
||||
var isCreating = command.IsCreatingInstance;
|
||||
|
||||
if (result.Is(ConfigureInstanceResult.Success)) {
|
||||
string action = isCreating ? "Added" : "Edited";
|
||||
string relation = isCreating ? "to agent" : "in agent";
|
||||
Logger.Information(action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\".", instanceName, instanceGuid, configuration.AgentName);
|
||||
}
|
||||
else {
|
||||
string action = isCreating ? "adding" : "editing";
|
||||
string relation = isCreating ? "to agent" : "in agent";
|
||||
Logger.Information("Failed " + action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, configuration.AgentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
|
||||
}
|
||||
|
||||
return result.Map(static result => result switch {
|
||||
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
|
||||
_ => CreateOrUpdateInstanceResult.UnknownError
|
||||
});
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
|
||||
return RequestInstance<InstanceActor.LaunchInstanceCommand, LaunchInstanceResult>(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
|
||||
return RequestInstance<InstanceActor.StopInstanceCommand, StopInstanceResult>(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
|
||||
}
|
||||
|
||||
private Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
|
||||
return RequestInstance<InstanceActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(command.InstanceGuid, new InstanceActor.SendCommandToInstanceCommand(command.AuditLogUserGuid, command.Command));
|
||||
}
|
||||
}
|
@ -36,12 +36,12 @@ sealed class AgentManager {
|
||||
this.dbProvider = dbProvider;
|
||||
this.cancellationToken = cancellationToken;
|
||||
|
||||
this.addAgentActorFactory = CreateAgentActor;
|
||||
addAgentActorFactory = (_, agent) => CreateAgentActor(agent);
|
||||
}
|
||||
|
||||
private ActorRef<AgentActor.ICommand> CreateAgentActor(Guid agentGuid, AgentConfiguration agentConfiguration) {
|
||||
var init = new AgentActor.Init(agentGuid, agentConfiguration, controllerState, minecraftVersions, dbProvider, cancellationToken);
|
||||
var name = "Agent:" + agentGuid;
|
||||
private ActorRef<AgentActor.ICommand> CreateAgentActor(AgentConfiguration agentConfiguration) {
|
||||
var init = new AgentActor.Init(agentConfiguration, controllerState, minecraftVersions, dbProvider, cancellationToken);
|
||||
var name = "Agent:" + agentConfiguration.AgentGuid;
|
||||
return actorSystem.ActorOf(AgentActor.Factory(init), name);
|
||||
}
|
||||
|
||||
@ -49,11 +49,10 @@ sealed class AgentManager {
|
||||
await using var ctx = dbProvider.Eager();
|
||||
|
||||
await foreach (var entity in ctx.Agents.AsAsyncEnumerable().WithCancellation(cancellationToken)) {
|
||||
var agentGuid = entity.AgentGuid;
|
||||
var agentConfiguration = new AgentConfiguration(entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory);
|
||||
var agentProperties = new AgentConfiguration(entity.AgentGuid, entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory);
|
||||
|
||||
if (agentsByGuid.TryAdd(agentGuid, CreateAgentActor(agentGuid, agentConfiguration))) {
|
||||
Logger.Information("Loaded agent \"{AgentName}\" (GUID {AgentGuid}) from database.", agentConfiguration.AgentName, agentGuid);
|
||||
if (agentsByGuid.TryAdd(entity.AgentGuid, CreateAgentActor(agentProperties))) {
|
||||
Logger.Information("Loaded agent \"{AgentName}\" (GUID {AgentGuid}) from database.", agentProperties.AgentName, agentProperties.AgentGuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -83,9 +82,9 @@ sealed class AgentManager {
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
|
||||
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentInstanceRouterActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
|
||||
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
|
||||
return await agent.Request(command, cancellationToken);
|
||||
return await agent.Request(new AgentActor.RouteToInstanceCommand(command), cancellationToken);
|
||||
}
|
||||
else {
|
||||
return InstanceActionResult.General<TReply>(InstanceActionGeneralResult.AgentDoesNotExist);
|
||||
|
@ -17,8 +17,6 @@ using Phantom.Utils.Tasks;
|
||||
namespace Phantom.Controller.Services;
|
||||
|
||||
public sealed class ControllerServices : IAsyncDisposable {
|
||||
public ActorSystem ActorSystem { get; }
|
||||
|
||||
private TaskManager TaskManager { get; }
|
||||
private ControllerState ControllerState { get; }
|
||||
private MinecraftVersions MinecraftVersions { get; }
|
||||
@ -35,6 +33,7 @@ public sealed class ControllerServices : IAsyncDisposable {
|
||||
private UserLoginManager UserLoginManager { get; }
|
||||
private AuditLogManager AuditLogManager { get; }
|
||||
|
||||
private readonly ActorSystem actorSystem;
|
||||
private readonly IDbContextProvider dbProvider;
|
||||
private readonly AuthToken webAuthToken;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
@ -44,13 +43,13 @@ public sealed class ControllerServices : IAsyncDisposable {
|
||||
this.webAuthToken = webAuthToken;
|
||||
this.cancellationToken = shutdownCancellationToken;
|
||||
|
||||
this.ActorSystem = ActorSystemFactory.Create("Controller");
|
||||
this.actorSystem = ActorSystemFactory.Create("Controller");
|
||||
|
||||
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
|
||||
this.ControllerState = new ControllerState();
|
||||
this.MinecraftVersions = new MinecraftVersions();
|
||||
|
||||
this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
|
||||
this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
|
||||
this.InstanceLogManager = new InstanceLogManager();
|
||||
|
||||
this.UserManager = new UserManager(dbProvider);
|
||||
@ -68,7 +67,7 @@ public sealed class ControllerServices : IAsyncDisposable {
|
||||
}
|
||||
|
||||
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
|
||||
return new WebMessageListener(ActorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
|
||||
return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
|
||||
}
|
||||
|
||||
public async Task Initialize() {
|
||||
@ -79,7 +78,7 @@ public sealed class ControllerServices : IAsyncDisposable {
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync() {
|
||||
await ActorSystem.Terminate();
|
||||
ActorSystem.Dispose();
|
||||
await actorSystem.Terminate();
|
||||
actorSystem.Dispose();
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ sealed class ControllerState {
|
||||
public ObservableState<ImmutableDictionary<Guid, Instance>>.Receiver InstancesByGuidReceiver => instancesByGuid.ReceiverSide;
|
||||
|
||||
public void UpdateAgent(Agent agent) {
|
||||
agentsByGuid.PublisherSide.Publish(static (agentsByGuid, agent) => agentsByGuid.SetItem(agent.AgentGuid, agent), agent);
|
||||
agentsByGuid.PublisherSide.Publish(static (agentsByGuid, agent) => agentsByGuid.SetItem(agent.Configuration.AgentGuid, agent), agent);
|
||||
}
|
||||
|
||||
public void UpdateAgentJavaRuntimes(Guid agentGuid, ImmutableArray<TaggedJavaRuntime> runtimes) {
|
||||
@ -28,6 +28,6 @@ sealed class ControllerState {
|
||||
}
|
||||
|
||||
public void UpdateInstance(Instance instance) {
|
||||
instancesByGuid.PublisherSide.Publish(static (instancesByGuid, instance) => instancesByGuid.SetItem(instance.InstanceGuid, instance), instance);
|
||||
instancesByGuid.PublisherSide.Publish(static (instancesByGuid, instance) => instancesByGuid.SetItem(instance.Configuration.InstanceGuid, instance), instance);
|
||||
}
|
||||
}
|
||||
|
@ -2,9 +2,12 @@
|
||||
using Phantom.Common.Data.Minecraft;
|
||||
using Phantom.Common.Data.Replies;
|
||||
using Phantom.Common.Data.Web.Instance;
|
||||
using Phantom.Common.Data.Web.Minecraft;
|
||||
using Phantom.Common.Messages.Agent;
|
||||
using Phantom.Common.Messages.Agent.ToAgent;
|
||||
using Phantom.Controller.Database;
|
||||
using Phantom.Controller.Database.Entities;
|
||||
using Phantom.Controller.Database.Repositories;
|
||||
using Phantom.Controller.Services.Agents;
|
||||
using Phantom.Utils.Actor;
|
||||
|
||||
@ -19,24 +22,25 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
||||
|
||||
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
|
||||
private readonly AgentConnection agentConnection;
|
||||
private readonly IDbContextProvider dbProvider;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
private readonly Guid instanceGuid;
|
||||
private Guid InstanceGuid => configuration.InstanceGuid;
|
||||
|
||||
private InstanceConfiguration configuration;
|
||||
private IInstanceStatus status;
|
||||
private bool launchAutomatically;
|
||||
|
||||
private readonly ActorRef<InstanceDatabaseStorageActor.ICommand> databaseStorageActor;
|
||||
|
||||
private InstanceActor(Init init) {
|
||||
this.agentActorRef = init.AgentActorRef;
|
||||
this.agentConnection = init.AgentConnection;
|
||||
this.dbProvider = init.DbProvider;
|
||||
this.cancellationToken = init.CancellationToken;
|
||||
|
||||
(this.instanceGuid, this.configuration, this.status, this.launchAutomatically) = init.Instance;
|
||||
|
||||
this.databaseStorageActor = Context.ActorOf(InstanceDatabaseStorageActor.Factory(new InstanceDatabaseStorageActor.Init(instanceGuid, init.DbProvider, init.CancellationToken)), "DatabaseStorage");
|
||||
var instance = init.Instance;
|
||||
this.configuration = instance.Configuration;
|
||||
this.status = instance.Status;
|
||||
this.launchAutomatically = instance.LaunchAutomatically;
|
||||
|
||||
Receive<SetStatusCommand>(SetStatus);
|
||||
ReceiveAsyncAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance);
|
||||
@ -46,14 +50,8 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
||||
}
|
||||
|
||||
private void NotifyInstanceUpdated() {
|
||||
agentActorRef.Tell(new AgentActor.ReceiveInstanceDataCommand(new Instance(instanceGuid, configuration, status, launchAutomatically)));
|
||||
}
|
||||
|
||||
private void SetLaunchAutomatically(bool newValue) {
|
||||
if (launchAutomatically != newValue) {
|
||||
launchAutomatically = newValue;
|
||||
NotifyInstanceUpdated();
|
||||
}
|
||||
var instance = new Instance(configuration, status, launchAutomatically);
|
||||
agentActorRef.Tell(new AgentActor.ReceiveInstanceDataCommand(instance));
|
||||
}
|
||||
|
||||
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
|
||||
@ -65,7 +63,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
||||
|
||||
public sealed record SetStatusCommand(IInstanceStatus Status) : ICommand;
|
||||
|
||||
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, Guid InstanceGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsCreatingInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
|
||||
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsCreatingInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
|
||||
|
||||
public sealed record LaunchInstanceCommand(Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
|
||||
|
||||
@ -79,55 +77,87 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
||||
}
|
||||
|
||||
private async Task<InstanceActionResult<ConfigureInstanceResult>> ConfigureInstance(ConfigureInstanceCommand command) {
|
||||
var message = new ConfigureInstanceMessage(command.InstanceGuid, command.Configuration, command.LaunchProperties);
|
||||
var message = new ConfigureInstanceMessage(command.Configuration, command.LaunchProperties);
|
||||
var result = await SendInstanceActionMessage<ConfigureInstanceMessage, ConfigureInstanceResult>(message);
|
||||
|
||||
if (result.Is(ConfigureInstanceResult.Success)) {
|
||||
configuration = command.Configuration;
|
||||
NotifyInstanceUpdated();
|
||||
|
||||
var storeCommand = new InstanceDatabaseStorageActor.StoreInstanceConfigurationCommand(
|
||||
command.AuditLogUserGuid,
|
||||
command.IsCreatingInstance,
|
||||
configuration
|
||||
);
|
||||
await using var db = dbProvider.Lazy();
|
||||
|
||||
databaseStorageActor.Tell(storeCommand);
|
||||
InstanceEntity entity = db.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
|
||||
entity.AgentGuid = configuration.AgentGuid;
|
||||
entity.InstanceName = configuration.InstanceName;
|
||||
entity.ServerPort = configuration.ServerPort;
|
||||
entity.RconPort = configuration.RconPort;
|
||||
entity.MinecraftVersion = configuration.MinecraftVersion;
|
||||
entity.MinecraftServerKind = configuration.MinecraftServerKind;
|
||||
entity.MemoryAllocation = configuration.MemoryAllocation;
|
||||
entity.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
|
||||
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
|
||||
|
||||
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
|
||||
if (command.IsCreatingInstance) {
|
||||
auditLogWriter.InstanceCreated(configuration.InstanceGuid);
|
||||
}
|
||||
else {
|
||||
auditLogWriter.InstanceEdited(configuration.InstanceGuid);
|
||||
}
|
||||
|
||||
await db.Ctx.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
|
||||
var message = new LaunchInstanceMessage(instanceGuid);
|
||||
var message = new LaunchInstanceMessage(InstanceGuid);
|
||||
var result = await SendInstanceActionMessage<LaunchInstanceMessage, LaunchInstanceResult>(message);
|
||||
|
||||
if (result.Is(LaunchInstanceResult.LaunchInitiated)) {
|
||||
SetLaunchAutomatically(true);
|
||||
databaseStorageActor.Tell(new InstanceDatabaseStorageActor.StoreInstanceLaunchedCommand(command.AuditLogUserGuid));
|
||||
await HandleInstanceManuallyLaunchedOrStopped(true, command.AuditLogUserGuid, auditLogWriter => auditLogWriter.InstanceLaunched(InstanceGuid));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private async Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
|
||||
var message = new StopInstanceMessage(instanceGuid, command.StopStrategy);
|
||||
var message = new StopInstanceMessage(InstanceGuid, command.StopStrategy);
|
||||
var result = await SendInstanceActionMessage<StopInstanceMessage, StopInstanceResult>(message);
|
||||
|
||||
if (result.Is(StopInstanceResult.StopInitiated)) {
|
||||
SetLaunchAutomatically(false);
|
||||
databaseStorageActor.Tell(new InstanceDatabaseStorageActor.StoreInstanceStoppedCommand(command.AuditLogUserGuid, command.StopStrategy));
|
||||
await HandleInstanceManuallyLaunchedOrStopped(false, command.AuditLogUserGuid, auditLogWriter => auditLogWriter.InstanceStopped(InstanceGuid, command.StopStrategy.Seconds));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private async Task HandleInstanceManuallyLaunchedOrStopped(bool wasLaunched, Guid auditLogUserGuid, Action<AuditLogRepository.ItemWriter> addAuditEvent) {
|
||||
if (launchAutomatically != wasLaunched) {
|
||||
launchAutomatically = wasLaunched;
|
||||
NotifyInstanceUpdated();
|
||||
}
|
||||
|
||||
await using var db = dbProvider.Lazy();
|
||||
var entity = await db.Ctx.Instances.FindAsync(new object[] { InstanceGuid }, cancellationToken);
|
||||
if (entity != null) {
|
||||
entity.LaunchAutomatically = wasLaunched;
|
||||
addAuditEvent(new AuditLogRepository(db).Writer(auditLogUserGuid));
|
||||
await db.Ctx.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
|
||||
var message = new SendCommandToInstanceMessage(instanceGuid, command.Command);
|
||||
var message = new SendCommandToInstanceMessage(InstanceGuid, command.Command);
|
||||
var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(message);
|
||||
|
||||
if (result.Is(SendCommandToInstanceResult.Success)) {
|
||||
databaseStorageActor.Tell(new InstanceDatabaseStorageActor.StoreInstanceCommandSentCommand(command.AuditLogUserGuid, command.Command));
|
||||
await using var db = dbProvider.Lazy();
|
||||
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
|
||||
|
||||
auditLogWriter.InstanceCommandExecuted(InstanceGuid, command.Command);
|
||||
await db.Ctx.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@ -1,116 +1,9 @@
|
||||
using Phantom.Common.Data.Instance;
|
||||
using Phantom.Common.Data.Minecraft;
|
||||
using Phantom.Common.Data.Web.Minecraft;
|
||||
using Phantom.Controller.Database;
|
||||
using Phantom.Controller.Database.Entities;
|
||||
using Phantom.Controller.Database.Repositories;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Serilog;
|
||||
using Phantom.Utils.Actor;
|
||||
|
||||
namespace Phantom.Controller.Services.Instances;
|
||||
|
||||
sealed class InstanceDatabaseStorageActor : ReceiveActor<InstanceDatabaseStorageActor.ICommand> {
|
||||
private static readonly ILogger Logger = PhantomLogger.Create<InstanceDatabaseStorageActor>();
|
||||
|
||||
public readonly record struct Init(Guid InstanceGuid, IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
||||
|
||||
public static Props<ICommand> Factory(Init init) {
|
||||
return Props<ICommand>.Create(() => new InstanceDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
||||
}
|
||||
|
||||
private readonly Guid instanceGuid;
|
||||
private readonly IDbContextProvider dbProvider;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
private InstanceDatabaseStorageActor(Init init) {
|
||||
this.instanceGuid = init.InstanceGuid;
|
||||
this.dbProvider = init.DbProvider;
|
||||
this.cancellationToken = init.CancellationToken;
|
||||
|
||||
ReceiveAsync<StoreInstanceConfigurationCommand>(StoreInstanceConfiguration);
|
||||
ReceiveAsync<StoreInstanceLaunchedCommand>(StoreInstanceLaunched);
|
||||
ReceiveAsync<StoreInstanceStoppedCommand>(StoreInstanceStopped);
|
||||
ReceiveAsync<StoreInstanceCommandSentCommand>(StoreInstanceCommandSent);
|
||||
}
|
||||
|
||||
private ValueTask<InstanceEntity?> FindInstanceEntity(ILazyDbContext db) {
|
||||
return db.Ctx.Instances.FindAsync(new object[] { instanceGuid }, cancellationToken);
|
||||
}
|
||||
|
||||
public interface ICommand {}
|
||||
|
||||
public sealed record StoreInstanceConfigurationCommand(Guid AuditLogUserGuid, bool IsCreatingInstance, InstanceConfiguration Configuration) : ICommand;
|
||||
|
||||
public sealed record StoreInstanceLaunchedCommand(Guid AuditLogUserGuid) : ICommand;
|
||||
|
||||
public sealed record StoreInstanceStoppedCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand;
|
||||
|
||||
public sealed record StoreInstanceCommandSentCommand(Guid AuditLogUserGuid, string Command) : ICommand;
|
||||
|
||||
private async Task StoreInstanceConfiguration(StoreInstanceConfigurationCommand command) {
|
||||
var configuration = command.Configuration;
|
||||
|
||||
await using (var db = dbProvider.Lazy()) {
|
||||
InstanceEntity entity = db.Ctx.InstanceUpsert.Fetch(instanceGuid);
|
||||
entity.AgentGuid = configuration.AgentGuid;
|
||||
entity.InstanceName = configuration.InstanceName;
|
||||
entity.ServerPort = configuration.ServerPort;
|
||||
entity.RconPort = configuration.RconPort;
|
||||
entity.MinecraftVersion = configuration.MinecraftVersion;
|
||||
entity.MinecraftServerKind = configuration.MinecraftServerKind;
|
||||
entity.MemoryAllocation = configuration.MemoryAllocation;
|
||||
entity.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
|
||||
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
|
||||
|
||||
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
|
||||
if (command.IsCreatingInstance) {
|
||||
auditLogWriter.InstanceCreated(instanceGuid);
|
||||
}
|
||||
else {
|
||||
auditLogWriter.InstanceEdited(instanceGuid);
|
||||
}
|
||||
|
||||
await db.Ctx.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
|
||||
Logger.Information("Stored instance \"{InstanceName}\" (GUID {InstanceGuid}) in database.", configuration.InstanceName, instanceGuid);
|
||||
}
|
||||
|
||||
private async Task StoreInstanceLaunched(StoreInstanceLaunchedCommand command) {
|
||||
await using var db = dbProvider.Lazy();
|
||||
|
||||
var entity = await FindInstanceEntity(db);
|
||||
if (entity != null) {
|
||||
entity.LaunchAutomatically = true;
|
||||
}
|
||||
|
||||
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
|
||||
auditLogWriter.InstanceLaunched(instanceGuid);
|
||||
|
||||
await db.Ctx.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
|
||||
private async Task StoreInstanceStopped(StoreInstanceStoppedCommand command) {
|
||||
await using var db = dbProvider.Lazy();
|
||||
|
||||
var entity = await FindInstanceEntity(db);
|
||||
if (entity != null) {
|
||||
entity.LaunchAutomatically = false;
|
||||
}
|
||||
|
||||
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
|
||||
auditLogWriter.InstanceStopped(instanceGuid, command.StopStrategy.Seconds);
|
||||
|
||||
await db.Ctx.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
|
||||
private async Task StoreInstanceCommandSent(StoreInstanceCommandSentCommand command) {
|
||||
await using var db = dbProvider.Lazy();
|
||||
|
||||
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
|
||||
auditLogWriter.InstanceCommandExecuted(instanceGuid, command.Command);
|
||||
|
||||
await db.Ctx.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
public sealed record StoreInstanceCommand() : ICommand;
|
||||
}
|
||||
|
@ -187,7 +187,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
|
||||
}
|
||||
|
||||
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message) {
|
||||
return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.InstanceGuid, message.Configuration));
|
||||
return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.Configuration));
|
||||
}
|
||||
|
||||
public Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {
|
||||
|
@ -55,22 +55,23 @@ try {
|
||||
|
||||
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
|
||||
|
||||
await using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
|
||||
await controllerServices.Initialize();
|
||||
await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) {
|
||||
await controllerServices.Initialize();
|
||||
|
||||
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
|
||||
return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
|
||||
}
|
||||
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
|
||||
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
|
||||
}
|
||||
|
||||
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
|
||||
try {
|
||||
await Task.WhenAll(
|
||||
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, controllerServices.ActorSystem, shutdownCancellationToken),
|
||||
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, controllerServices.ActorSystem, shutdownCancellationToken)
|
||||
);
|
||||
} finally {
|
||||
await rpcTaskManager.Stop();
|
||||
NetMQConfig.Cleanup();
|
||||
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
|
||||
try {
|
||||
await Task.WhenAll(
|
||||
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
|
||||
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
|
||||
);
|
||||
} finally {
|
||||
await rpcTaskManager.Stop();
|
||||
NetMQConfig.Cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -5,7 +5,6 @@ namespace Phantom.Utils.Actor;
|
||||
public readonly struct ActorConfiguration {
|
||||
public SupervisorStrategy? SupervisorStrategy { get; init; }
|
||||
public string? MailboxType { get; init; }
|
||||
public int? StashCapacity { get; init; }
|
||||
|
||||
internal Props Apply(Props props) {
|
||||
if (SupervisorStrategy != null) {
|
||||
@ -16,10 +15,6 @@ public readonly struct ActorConfiguration {
|
||||
props = props.WithMailbox(MailboxType);
|
||||
}
|
||||
|
||||
if (StashCapacity != null) {
|
||||
props = props.WithStashCapacity(StashCapacity.Value);
|
||||
}
|
||||
|
||||
return props;
|
||||
}
|
||||
}
|
||||
|
@ -28,8 +28,4 @@ public readonly struct ActorRef<TMessage> {
|
||||
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
|
||||
return Request(message, timeout: null, cancellationToken);
|
||||
}
|
||||
|
||||
public Task<bool> Stop(TimeSpan? timeout = null) {
|
||||
return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan);
|
||||
}
|
||||
}
|
||||
|
@ -12,7 +12,6 @@
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
|
||||
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
|
||||
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
|
||||
</ItemGroup>
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
namespace Phantom.Utils.Rpc;
|
||||
|
||||
public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
||||
public string LoggerName => "Rpc:" + ServiceName;
|
||||
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
||||
public string TcpUrl => "tcp://" + Host + ":" + Port;
|
||||
}
|
||||
|
@ -1,64 +0,0 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Event;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
namespace Phantom.Utils.Rpc;
|
||||
|
||||
sealed class RpcReceiverActor<TClientListener, TServerListener, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand>, IWithStash where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||
public readonly record struct Init(IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions, MessageHandler<TServerListener> MessageHandler, RpcConnectionToClient<TClientListener> Connection);
|
||||
|
||||
public static Props<ICommand> Factory(Init init) {
|
||||
return Props<ICommand>.Create(() => new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>(init), new ActorConfiguration {
|
||||
SupervisorStrategy = SupervisorStrategies.Resume,
|
||||
StashCapacity = 100
|
||||
});
|
||||
}
|
||||
|
||||
public IStash Stash { get; set; } = null!;
|
||||
|
||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||
private readonly MessageHandler<TServerListener> messageHandler;
|
||||
private readonly RpcConnectionToClient<TClientListener> connection;
|
||||
|
||||
private RpcReceiverActor(Init init) {
|
||||
this.messageDefinitions = init.MessageDefinitions;
|
||||
this.messageHandler = init.MessageHandler;
|
||||
this.connection = init.Connection;
|
||||
|
||||
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
|
||||
}
|
||||
|
||||
public interface ICommand {}
|
||||
|
||||
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data) : ICommand;
|
||||
|
||||
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
|
||||
if (messageDefinitions.IsRegistrationMessage(command.MessageType)) {
|
||||
Handle(command.Data);
|
||||
|
||||
if (await connection.GetAuthorization()) {
|
||||
Stash.UnstashAll();
|
||||
|
||||
Become(() => {
|
||||
Receive<ReceiveMessageCommand>(ReceiveMessageAuthorized);
|
||||
});
|
||||
}
|
||||
}
|
||||
else if (Stash.IsFull) {
|
||||
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
|
||||
}
|
||||
else {
|
||||
Stash.Stash();
|
||||
}
|
||||
}
|
||||
|
||||
private void ReceiveMessageAuthorized(ReceiveMessageCommand command) {
|
||||
Handle(command.Data);
|
||||
}
|
||||
|
||||
private void Handle(ReadOnlyMemory<byte> data) {
|
||||
messageDefinitions.ToServer.Handle(data, messageHandler);
|
||||
}
|
||||
}
|
@ -1,7 +1,5 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Akka.Actor;
|
||||
using NetMQ.Sockets;
|
||||
using Phantom.Utils.Actor;
|
||||
using Phantom.Utils.Logging;
|
||||
using Phantom.Utils.Rpc.Message;
|
||||
using Phantom.Utils.Rpc.Sockets;
|
||||
@ -11,29 +9,25 @@ using Serilog.Events;
|
||||
namespace Phantom.Utils.Rpc.Runtime;
|
||||
|
||||
public static class RpcServerRuntime {
|
||||
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, actorSystem, cancellationToken);
|
||||
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) {
|
||||
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
|
||||
var socket = RpcServerSocket.Connect(config);
|
||||
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, actorSystem, cancellationToken).Launch();
|
||||
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
|
||||
}
|
||||
|
||||
private readonly string serviceName;
|
||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
|
||||
private readonly ActorSystem actorSystem;
|
||||
private readonly TaskManager taskManager;
|
||||
private readonly CancellationToken cancellationToken;
|
||||
|
||||
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) : base(socket) {
|
||||
this.serviceName = socket.Config.ServiceName;
|
||||
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
|
||||
this.messageDefinitions = messageDefinitions;
|
||||
this.listenerFactory = listenerFactory;
|
||||
this.actorSystem = actorSystem;
|
||||
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
|
||||
this.cancellationToken = cancellationToken;
|
||||
}
|
||||
@ -68,17 +62,18 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
||||
}
|
||||
|
||||
var clientLoggerName = LoggerName + ":" + routingId;
|
||||
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
|
||||
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
|
||||
|
||||
connection.Closed += OnConnectionClosed;
|
||||
|
||||
var clientActorName = "RpcReceive-" + serviceName + "-" + routingId;
|
||||
|
||||
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, listenerFactory(connection), taskManager);
|
||||
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager);
|
||||
clients[routingId] = client;
|
||||
client.EnqueueRegistrationMessage(messageType, data);
|
||||
}
|
||||
else {
|
||||
client.Enqueue(messageType, data);
|
||||
}
|
||||
|
||||
client.Enqueue(messageType, data);
|
||||
}
|
||||
|
||||
foreach (var client in clients.Values) {
|
||||
@ -99,22 +94,27 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
||||
private sealed class Client : MessageHandler<TServerListener> {
|
||||
public RpcConnectionToClient<TClientListener> Connection { get; }
|
||||
|
||||
private readonly ActorRef<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand> receiverActor;
|
||||
private readonly RpcQueue processingQueue;
|
||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||
private readonly TaskManager taskManager;
|
||||
|
||||
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientListener> connection, ActorSystem actorSystem, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
|
||||
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
|
||||
this.Connection = connection;
|
||||
this.Connection.Closed += OnConnectionClosed;
|
||||
|
||||
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Factory(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Init(messageDefinitions, this, Connection)), actorName);
|
||||
this.processingQueue = processingQueue;
|
||||
this.messageDefinitions = messageDefinitions;
|
||||
this.taskManager = taskManager;
|
||||
}
|
||||
|
||||
internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) {
|
||||
LogMessageType(messageType, data);
|
||||
processingQueue.Enqueue(() => Handle(data));
|
||||
}
|
||||
|
||||
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
|
||||
LogMessageType(messageType, data);
|
||||
receiverActor.Tell(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ReceiveMessageCommand(messageType, data));
|
||||
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data));
|
||||
}
|
||||
|
||||
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
|
||||
@ -123,6 +123,19 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
||||
}
|
||||
}
|
||||
|
||||
private void Handle(ReadOnlyMemory<byte> data) {
|
||||
messageDefinitions.ToServer.Handle(data, this);
|
||||
}
|
||||
|
||||
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
|
||||
if (await Connection.GetAuthorization()) {
|
||||
Handle(data);
|
||||
}
|
||||
else {
|
||||
Logger.Warning("Dropped message after failed registration.");
|
||||
}
|
||||
}
|
||||
|
||||
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
||||
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
|
||||
}
|
||||
@ -134,7 +147,7 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
||||
|
||||
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
|
||||
await StopReceiving();
|
||||
await receiverActor.Stop();
|
||||
await processingQueue.Stop();
|
||||
await Connection.StopSending();
|
||||
Logger.Debug("Connection closed.");
|
||||
});
|
||||
|
@ -19,6 +19,6 @@ public sealed class AgentManager {
|
||||
}
|
||||
|
||||
public ImmutableDictionary<Guid, Agent> ToDictionaryByGuid() {
|
||||
return agents.Value.ToImmutableDictionary(static agent => agent.AgentGuid);
|
||||
return agents.Value.ToImmutableDictionary(static agent => agent.Configuration.AgentGuid);
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ public sealed class InstanceManager {
|
||||
public EventSubscribers<InstanceDictionary> InstancesChanged => instances.Subs;
|
||||
|
||||
internal void RefreshInstances(ImmutableArray<Instance> newInstances) {
|
||||
instances.SetTo(newInstances.ToImmutableDictionary(static instance => instance.InstanceGuid));
|
||||
instances.SetTo(newInstances.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid));
|
||||
}
|
||||
|
||||
public InstanceDictionary GetAll() {
|
||||
@ -34,8 +34,8 @@ public sealed class InstanceManager {
|
||||
return instances.Value.GetValueOrDefault(instanceGuid);
|
||||
}
|
||||
|
||||
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(Guid loggedInUserGuid, Guid instanceGuid, InstanceConfiguration configuration, CancellationToken cancellationToken) {
|
||||
var message = new CreateOrUpdateInstanceMessage(loggedInUserGuid, instanceGuid, configuration);
|
||||
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(Guid loggedInUserGuid, InstanceConfiguration configuration, CancellationToken cancellationToken) {
|
||||
var message = new CreateOrUpdateInstanceMessage(loggedInUserGuid, configuration);
|
||||
return controllerConnection.Send<CreateOrUpdateInstanceMessage, InstanceActionResult<CreateOrUpdateInstanceResult>>(message, cancellationToken);
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@
|
||||
}
|
||||
<Cell>
|
||||
<p class="fw-semibold">@configuration.AgentName</p>
|
||||
<small class="font-monospace text-uppercase">@agent.AgentGuid.ToString()</small>
|
||||
<small class="font-monospace text-uppercase">@configuration.AgentGuid.ToString()</small>
|
||||
</Cell>
|
||||
<Cell class="text-end">
|
||||
<ProgressBar Value="@(usedInstances ?? 0)" Maximum="@configuration.MaxInstances">
|
||||
@ -73,7 +73,7 @@
|
||||
protected override void OnInitialized() {
|
||||
AgentManager.AgentsChanged.Subscribe(this, agents => {
|
||||
var sortedAgents = agents.Sort(static (a1, a2) => a1.Configuration.AgentName.CompareTo(a2.Configuration.AgentName));
|
||||
agentTable.UpdateFrom(sortedAgents, static agent => agent.AgentGuid, static agent => agent, static (agent, _) => agent);
|
||||
agentTable.UpdateFrom(sortedAgents, static agent => agent.Configuration.AgentGuid, static agent => agent, static (agent, _) => agent);
|
||||
InvokeAsync(StateHasChanged);
|
||||
});
|
||||
}
|
||||
|
@ -58,7 +58,7 @@
|
||||
try {
|
||||
logItems = await AuditLogManager.GetMostRecentItems(50, cancellationToken);
|
||||
userNamesByGuid = (await UserManager.GetAll(cancellationToken)).ToImmutableDictionary(static user => user.Guid, static user => user.Name);
|
||||
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.InstanceGuid, static instance => instance.Configuration.InstanceName);
|
||||
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid, static instance => instance.Configuration.InstanceName);
|
||||
} finally {
|
||||
initializationCancellationTokenSource.Dispose();
|
||||
}
|
||||
|
@ -61,8 +61,8 @@
|
||||
|
||||
try {
|
||||
logItems = await EventLogManager.GetMostRecentItems(50, cancellationToken);
|
||||
agentNamesByGuid = AgentManager.GetAll().ToImmutableDictionary(static kvp => kvp.AgentGuid, static kvp => kvp.Configuration.AgentName);
|
||||
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.InstanceGuid, static instance => instance.Configuration.InstanceName);
|
||||
agentNamesByGuid = AgentManager.GetAll().Select(static agent => agent.Configuration).ToImmutableDictionary(static kvp => kvp.AgentGuid, static kvp => kvp.AgentName);
|
||||
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid, static instance => instance.Configuration.InstanceName);
|
||||
} finally {
|
||||
initializationCancellationTokenSource.Dispose();
|
||||
}
|
||||
|
@ -3,4 +3,4 @@
|
||||
@attribute [Authorize(Permission.CreateInstancesPolicy)]
|
||||
|
||||
<h1>New Instance</h1>
|
||||
<InstanceAddOrEditForm EditedInstance="null" />
|
||||
<InstanceAddOrEditForm EditedInstanceConfiguration="null" />
|
||||
|
@ -1,18 +1,18 @@
|
||||
@page "/instances/{InstanceGuid:guid}/edit"
|
||||
@attribute [Authorize(Permission.CreateInstancesPolicy)]
|
||||
@using Phantom.Common.Data.Web.Instance
|
||||
@using Phantom.Common.Data.Instance
|
||||
@using Phantom.Common.Data.Web.Users
|
||||
@using Phantom.Web.Services.Instances
|
||||
@inherits PhantomComponent
|
||||
@inherits Phantom.Web.Components.PhantomComponent
|
||||
@inject InstanceManager InstanceManager
|
||||
|
||||
@if (Instance == null) {
|
||||
@if (InstanceConfiguration == null) {
|
||||
<h1>Instance Not Found</h1>
|
||||
<p>Return to <a href="instances">all instances</a>.</p>
|
||||
}
|
||||
else {
|
||||
<h1>Edit Instance: @Instance.Configuration.InstanceName</h1>
|
||||
<InstanceAddOrEditForm EditedInstance="Instance" />
|
||||
<h1>Edit Instance: @InstanceConfiguration.InstanceName</h1>
|
||||
<InstanceAddOrEditForm EditedInstanceConfiguration="InstanceConfiguration" />
|
||||
}
|
||||
|
||||
@code {
|
||||
@ -20,10 +20,10 @@ else {
|
||||
[Parameter]
|
||||
public Guid InstanceGuid { get; init; }
|
||||
|
||||
private Instance? Instance { get; set; }
|
||||
private InstanceConfiguration? InstanceConfiguration { get; set; }
|
||||
|
||||
protected override void OnInitialized() {
|
||||
Instance = InstanceManager.GetByGuid(InstanceGuid);
|
||||
InstanceConfiguration = InstanceManager.GetByGuid(InstanceGuid)?.Configuration;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -16,7 +16,7 @@
|
||||
<a href="instances/create" class="btn btn-primary" role="button">New Instance</a>
|
||||
</PermissionView>
|
||||
|
||||
<Table TItem="Instance" Items="instances" ItemUrl="@(static instance => "instances/" + instance.InstanceGuid)">
|
||||
<Table TItem="Instance" Items="instances" ItemUrl="@(static instance => "instances/" + instance.Configuration.InstanceGuid)">
|
||||
<HeaderRow>
|
||||
<Column Width="40%">Agent</Column>
|
||||
<Column Width="40%">Name</Column>
|
||||
@ -35,7 +35,7 @@
|
||||
</Cell>
|
||||
<Cell>
|
||||
<p class="fw-semibold">@configuration.InstanceName</p>
|
||||
<small class="font-monospace text-uppercase">@instance.InstanceGuid.ToString()</small>
|
||||
<small class="font-monospace text-uppercase">@configuration.InstanceGuid.ToString()</small>
|
||||
</Cell>
|
||||
<Cell>
|
||||
<InstanceStatusText Status="instance.Status" />
|
||||
@ -51,7 +51,7 @@
|
||||
<p class="font-monospace">@configuration.MemoryAllocation.InMegabytes.ToString() MB</p>
|
||||
</Cell>
|
||||
<Cell>
|
||||
<a href="instances/@instance.InstanceGuid.ToString()" class="btn btn-info btn-sm">Detail</a>
|
||||
<a href="instances/@configuration.InstanceGuid.ToString()" class="btn btn-info btn-sm">Detail</a>
|
||||
</Cell>
|
||||
</ItemRow>
|
||||
<NoItemsRow>
|
||||
@ -66,7 +66,7 @@
|
||||
|
||||
protected override void OnInitialized() {
|
||||
AgentManager.AgentsChanged.Subscribe(this, agents => {
|
||||
this.agentNamesByGuid = agents.ToImmutableDictionary(static agent => agent.AgentGuid, static agent => agent.Configuration.AgentName);
|
||||
this.agentNamesByGuid = agents.Select(static agent => agent.Configuration).ToImmutableDictionary(static agent => agent.AgentGuid, static agent => agent.AgentName);
|
||||
InvokeAsync(StateHasChanged);
|
||||
});
|
||||
|
||||
|
@ -8,9 +8,9 @@
|
||||
@using Phantom.Common.Data.Web.Minecraft
|
||||
@using Phantom.Common.Data.Web.Users
|
||||
@using Phantom.Common.Messages.Web.ToController
|
||||
@using Phantom.Common.Data.Instance
|
||||
@using Phantom.Common.Data.Java
|
||||
@using Phantom.Common.Data
|
||||
@using Phantom.Common.Data.Instance
|
||||
@using Phantom.Web.Services
|
||||
@using Phantom.Web.Services.Agents
|
||||
@using Phantom.Web.Services.Instances
|
||||
@ -28,7 +28,7 @@
|
||||
@{
|
||||
static RenderFragment GetAgentOption(Agent agent) {
|
||||
var configuration = agent.Configuration;
|
||||
return @<option value="@agent.AgentGuid">
|
||||
return @<option value="@configuration.AgentGuid">
|
||||
@configuration.AgentName
|
||||
•
|
||||
@(agent.Stats?.RunningInstanceCount.ToString() ?? "?")/@(configuration.MaxInstances) @(configuration.MaxInstances == 1 ? "Instance" : "Instances")
|
||||
@ -37,7 +37,7 @@
|
||||
</option>;
|
||||
}
|
||||
}
|
||||
@if (EditedInstance == null) {
|
||||
@if (EditedInstanceConfiguration == null) {
|
||||
<FormSelectInput Id="instance-agent" Label="Agent" @bind-Value="form.SelectedAgentGuid">
|
||||
<option value="" selected>Select which agent will run the instance...</option>
|
||||
@foreach (var agent in allAgentsByGuid.Values.Where(static agent => agent.ConnectionStatus is AgentIsOnline).OrderBy(static agent => agent.Configuration.AgentName)) {
|
||||
@ -159,14 +159,14 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<FormButtonSubmit Label="@(EditedInstance == null ? "Create Instance" : "Edit Instance")" class="btn btn-primary" disabled="@(!IsSubmittable)" />
|
||||
<FormButtonSubmit Label="@(EditedInstanceConfiguration == null ? "Create Instance" : "Edit Instance")" class="btn btn-primary" disabled="@(!IsSubmittable)" />
|
||||
<FormSubmitError />
|
||||
</Form>
|
||||
|
||||
@code {
|
||||
|
||||
[Parameter, EditorRequired]
|
||||
public Instance? EditedInstance { get; init; }
|
||||
public InstanceConfiguration? EditedInstanceConfiguration { get; init; }
|
||||
|
||||
private ConfigureInstanceFormModel form = null!;
|
||||
|
||||
@ -271,7 +271,7 @@
|
||||
}
|
||||
|
||||
protected override void OnInitialized() {
|
||||
form = new ConfigureInstanceFormModel(this, EditedInstance?.Configuration.MemoryAllocation);
|
||||
form = new ConfigureInstanceFormModel(this, EditedInstanceConfiguration?.MemoryAllocation);
|
||||
}
|
||||
|
||||
protected override async Task OnInitializedAsync() {
|
||||
@ -282,19 +282,18 @@
|
||||
allAgentJavaRuntimes = await agentJavaRuntimesTask;
|
||||
allMinecraftVersions = await minecraftVersionsTask;
|
||||
|
||||
if (EditedInstance != null) {
|
||||
var configuration = EditedInstance.Configuration;
|
||||
form.SelectedAgentGuid = configuration.AgentGuid;
|
||||
form.InstanceName = configuration.InstanceName;
|
||||
form.ServerPort = configuration.ServerPort;
|
||||
form.RconPort = configuration.RconPort;
|
||||
form.MinecraftVersion = configuration.MinecraftVersion;
|
||||
form.MinecraftServerKind = configuration.MinecraftServerKind;
|
||||
form.MemoryUnits = configuration.MemoryAllocation.RawValue;
|
||||
form.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
|
||||
form.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
|
||||
if (EditedInstanceConfiguration != null) {
|
||||
form.SelectedAgentGuid = EditedInstanceConfiguration.AgentGuid;
|
||||
form.InstanceName = EditedInstanceConfiguration.InstanceName;
|
||||
form.ServerPort = EditedInstanceConfiguration.ServerPort;
|
||||
form.RconPort = EditedInstanceConfiguration.RconPort;
|
||||
form.MinecraftVersion = EditedInstanceConfiguration.MinecraftVersion;
|
||||
form.MinecraftServerKind = EditedInstanceConfiguration.MinecraftServerKind;
|
||||
form.MemoryUnits = EditedInstanceConfiguration.MemoryAllocation.RawValue;
|
||||
form.JavaRuntimeGuid = EditedInstanceConfiguration.JavaRuntimeGuid;
|
||||
form.JvmArguments = JvmArgumentsHelper.Join(EditedInstanceConfiguration.JvmArguments);
|
||||
|
||||
minecraftVersionType = allMinecraftVersions.FirstOrDefault(version => version.Id == configuration.MinecraftVersion)?.Type ?? minecraftVersionType;
|
||||
minecraftVersionType = allMinecraftVersions.FirstOrDefault(version => version.Id == EditedInstanceConfiguration.MinecraftVersion)?.Type ?? minecraftVersionType;
|
||||
}
|
||||
|
||||
form.EditContext.RevalidateWhenFieldChanges(tracked: nameof(ConfigureInstanceFormModel.SelectedAgentGuid), revalidated: nameof(ConfigureInstanceFormModel.MemoryUnits));
|
||||
@ -329,9 +328,9 @@
|
||||
return;
|
||||
}
|
||||
|
||||
var instanceGuid = EditedInstance?.InstanceGuid ?? Guid.NewGuid();
|
||||
var instanceConfiguration = new InstanceConfiguration(
|
||||
EditedInstance?.Configuration.AgentGuid ?? selectedAgent.AgentGuid,
|
||||
var instance = new InstanceConfiguration(
|
||||
EditedInstanceConfiguration?.AgentGuid ?? selectedAgent.Configuration.AgentGuid,
|
||||
EditedInstanceConfiguration?.InstanceGuid ?? Guid.NewGuid(),
|
||||
form.InstanceName,
|
||||
(ushort) form.ServerPort,
|
||||
(ushort) form.RconPort,
|
||||
@ -342,9 +341,9 @@
|
||||
JvmArgumentsHelper.Split(form.JvmArguments)
|
||||
);
|
||||
|
||||
var result = await InstanceManager.CreateOrUpdateInstance(loggedInUserGuid.Value, instanceGuid, instanceConfiguration, CancellationToken);
|
||||
var result = await InstanceManager.CreateOrUpdateInstance(loggedInUserGuid.Value, instance, CancellationToken);
|
||||
if (result.Is(CreateOrUpdateInstanceResult.Success)) {
|
||||
await Navigation.NavigateTo("instances/" + instanceGuid);
|
||||
await Navigation.NavigateTo("instances/" + instance.InstanceGuid);
|
||||
}
|
||||
else {
|
||||
form.SubmitModel.StopSubmitting(result.ToSentence(CreateOrUpdateInstanceResultExtensions.ToSentence));
|
||||
|
Loading…
Reference in New Issue
Block a user