1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-10-19 00:42:50 +02:00

Compare commits

..

2 Commits

Author SHA1 Message Date
dcfe66a337
Move RPC message receiving to Akka.NET 2024-03-15 22:00:37 +01:00
0d039b69de
Implement actors in Controller via Akka.NET 2024-03-11 00:01:54 +01:00
36 changed files with 536 additions and 459 deletions

View File

@ -17,6 +17,7 @@ sealed class Instance : IAsyncDisposable {
private IServerLauncher Launcher { get; set; }
private readonly SemaphoreSlim configurationSemaphore = new (1, 1);
private readonly Guid instanceGuid;
private readonly string shortName;
private readonly ILogger logger;
@ -29,7 +30,8 @@ sealed class Instance : IAsyncDisposable {
private readonly InstanceProcedureManager procedureManager;
public Instance(string shortName, InstanceServices services, InstanceConfiguration configuration, IServerLauncher launcher) {
public Instance(Guid instanceGuid, string shortName, InstanceServices services, InstanceConfiguration configuration, IServerLauncher launcher) {
this.instanceGuid = instanceGuid;
this.shortName = shortName;
this.logger = PhantomLogger.Create<Instance>(shortName);
@ -44,16 +46,16 @@ sealed class Instance : IAsyncDisposable {
}
public void ReportLastStatus() {
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, currentStatus));
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(instanceGuid, currentStatus));
}
private void ReportAndSetStatus(IInstanceStatus status) {
currentStatus = status;
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(Configuration.InstanceGuid, status));
Services.ControllerConnection.Send(new ReportInstanceStatusMessage(instanceGuid, status));
}
private void ReportEvent(IInstanceEvent instanceEvent) {
Services.ControllerConnection.Send(new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, Configuration.InstanceGuid, instanceEvent));
Services.ControllerConnection.Send(new ReportInstanceEventMessage(Guid.NewGuid(), DateTime.UtcNow, instanceGuid, instanceEvent));
}
internal void TransitionState(IInstanceState newState) {
@ -99,7 +101,7 @@ sealed class Instance : IAsyncDisposable {
await configurationSemaphore.WaitAsync(cancellationToken);
try {
procedure = new LaunchInstanceProcedure(Configuration, Launcher);
procedure = new LaunchInstanceProcedure(instanceGuid, Configuration, Launcher);
} finally {
configurationSemaphore.Release();
}

View File

@ -75,9 +75,8 @@ sealed class InstanceSessionManager : IAsyncDisposable {
});
}
public async Task<InstanceActionResult<ConfigureInstanceResult>> Configure(InstanceConfiguration configuration, InstanceLaunchProperties launchProperties, bool launchNow, bool alwaysReportStatus) {
public async Task<InstanceActionResult<ConfigureInstanceResult>> Configure(Guid instanceGuid, InstanceConfiguration configuration, InstanceLaunchProperties launchProperties, bool launchNow, bool alwaysReportStatus) {
return await AcquireSemaphoreAndRun(async () => {
var instanceGuid = configuration.InstanceGuid;
var instanceFolder = Path.Combine(basePath, instanceGuid.ToString());
Directories.Create(instanceFolder, Chmod.URWX_GRX);
@ -106,15 +105,15 @@ sealed class InstanceSessionManager : IAsyncDisposable {
if (instances.TryGetValue(instanceGuid, out var instance)) {
await instance.Reconfigure(configuration, launcher, shutdownCancellationToken);
Logger.Information("Reconfigured instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, configuration.InstanceGuid);
Logger.Information("Reconfigured instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, instanceGuid);
if (alwaysReportStatus) {
instance.ReportLastStatus();
}
}
else {
instances[instanceGuid] = instance = new Instance(GetInstanceLoggerName(instanceGuid), instanceServices, configuration, launcher);
Logger.Information("Created instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, configuration.InstanceGuid);
instances[instanceGuid] = instance = new Instance(instanceGuid, GetInstanceLoggerName(instanceGuid), instanceServices, configuration, launcher);
Logger.Information("Created instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, instanceGuid);
instance.ReportLastStatus();
instance.IsRunningChanged += OnInstanceIsRunningChanged;

View File

@ -6,7 +6,7 @@ using Phantom.Common.Data.Instance;
namespace Phantom.Agent.Services.Instances.Procedures;
sealed record LaunchInstanceProcedure(InstanceConfiguration Configuration, IServerLauncher Launcher, bool IsRestarting = false) : IInstanceProcedure {
sealed record LaunchInstanceProcedure(Guid InstanceGuid, InstanceConfiguration Configuration, IServerLauncher Launcher, bool IsRestarting = false) : IInstanceProcedure {
public async Task<IInstanceState?> Run(IInstanceContext context, CancellationToken cancellationToken) {
if (!IsRestarting && context.CurrentState is InstanceRunningState) {
return null;
@ -30,7 +30,7 @@ sealed record LaunchInstanceProcedure(InstanceConfiguration Configuration, IServ
context.Logger.Information("Session starting...");
try {
InstanceProcess process = await DoLaunch(context, cancellationToken);
return new InstanceRunningState(Configuration, Launcher, process, context);
return new InstanceRunningState(InstanceGuid, Configuration, Launcher, process, context);
} catch (OperationCanceledException) {
context.SetStatus(InstanceStatus.NotRunning);
} catch (LaunchFailureException e) {

View File

@ -12,6 +12,7 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
internal bool IsStopping { get; set; }
private readonly Guid instanceGuid;
private readonly InstanceConfiguration configuration;
private readonly IServerLauncher launcher;
private readonly IInstanceContext context;
@ -21,13 +22,14 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
private bool isDisposed;
public InstanceRunningState(InstanceConfiguration configuration, IServerLauncher launcher, InstanceProcess process, IInstanceContext context) {
public InstanceRunningState(Guid instanceGuid, InstanceConfiguration configuration, IServerLauncher launcher, InstanceProcess process, IInstanceContext context) {
this.instanceGuid = instanceGuid;
this.configuration = configuration;
this.launcher = launcher;
this.context = context;
this.Process = process;
this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.Services.TaskManager, configuration.InstanceGuid, context.ShortName);
this.logSender = new InstanceLogSender(context.Services.ControllerConnection, context.Services.TaskManager, instanceGuid, context.ShortName);
this.backupScheduler = new BackupScheduler(context.Services.TaskManager, context.Services.BackupManager, process, context, configuration.ServerPort);
this.backupScheduler.BackupCompleted += OnScheduledBackupCompleted;
@ -64,7 +66,7 @@ sealed class InstanceRunningState : IInstanceState, IDisposable {
else {
context.Logger.Information("Session ended unexpectedly, restarting...");
context.ReportEvent(InstanceEvent.Crashed);
context.EnqueueProcedure(new LaunchInstanceProcedure(configuration, launcher, IsRestarting: true));
context.EnqueueProcedure(new LaunchInstanceProcedure(instanceGuid, configuration, launcher, IsRestarting: true));
}
}

View File

@ -27,15 +27,15 @@ public sealed class MessageListener : IMessageToAgentListener {
public async Task<NoReply> HandleRegisterAgentSuccess(RegisterAgentSuccessMessage message) {
Logger.Information("Agent authentication successful.");
void ShutdownAfterConfigurationFailed(InstanceConfiguration configuration) {
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", configuration.InstanceName, configuration.InstanceGuid);
void ShutdownAfterConfigurationFailed(Guid instanceGuid, InstanceConfiguration configuration) {
Logger.Fatal("Unable to configure instance \"{Name}\" (GUID {Guid}), shutting down.", configuration.InstanceName, instanceGuid);
shutdownTokenSource.Cancel();
}
foreach (var configureInstanceMessage in message.InitialInstanceConfigurations) {
var result = await HandleConfigureInstance(configureInstanceMessage, alwaysReportStatus: true);
if (!result.Is(ConfigureInstanceResult.Success)) {
ShutdownAfterConfigurationFailed(configureInstanceMessage.Configuration);
ShutdownAfterConfigurationFailed(configureInstanceMessage.InstanceGuid, configureInstanceMessage.Configuration);
return NoReply.Instance;
}
}
@ -64,7 +64,7 @@ public sealed class MessageListener : IMessageToAgentListener {
}
private Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message, bool alwaysReportStatus) {
return agent.InstanceSessionManager.Configure(message.Configuration, message.LaunchProperties, message.LaunchNow, alwaysReportStatus);
return agent.InstanceSessionManager.Configure(message.InstanceGuid, message.Configuration, message.LaunchProperties, message.LaunchNow, alwaysReportStatus);
}
public async Task<InstanceActionResult<ConfigureInstanceResult>> HandleConfigureInstance(ConfigureInstanceMessage message) {

View File

@ -5,9 +5,10 @@ namespace Phantom.Common.Data.Web.Agent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record Agent(
[property: MemoryPackOrder(0)] AgentConfiguration Configuration,
[property: MemoryPackOrder(1)] AgentStats? Stats,
[property: MemoryPackOrder(2)] IAgentConnectionStatus ConnectionStatus
[property: MemoryPackOrder(0)] Guid AgentGuid,
[property: MemoryPackOrder(1)] AgentConfiguration Configuration,
[property: MemoryPackOrder(2)] AgentStats? Stats,
[property: MemoryPackOrder(3)] IAgentConnectionStatus ConnectionStatus
) {
[MemoryPackIgnore]
public RamAllocationUnits? AvailableMemory => Configuration.MaxMemory - Stats?.RunningInstanceMemory;

View File

@ -5,16 +5,15 @@ namespace Phantom.Common.Data.Web.Agent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record AgentConfiguration(
[property: MemoryPackOrder(0)] Guid AgentGuid,
[property: MemoryPackOrder(1)] string AgentName,
[property: MemoryPackOrder(2)] ushort ProtocolVersion,
[property: MemoryPackOrder(3)] string BuildVersion,
[property: MemoryPackOrder(4)] ushort MaxInstances,
[property: MemoryPackOrder(5)] RamAllocationUnits MaxMemory,
[property: MemoryPackOrder(6)] AllowedPorts? AllowedServerPorts = null,
[property: MemoryPackOrder(7)] AllowedPorts? AllowedRconPorts = null
[property: MemoryPackOrder(0)] string AgentName,
[property: MemoryPackOrder(1)] ushort ProtocolVersion,
[property: MemoryPackOrder(2)] string BuildVersion,
[property: MemoryPackOrder(3)] ushort MaxInstances,
[property: MemoryPackOrder(4)] RamAllocationUnits MaxMemory,
[property: MemoryPackOrder(5)] AllowedPorts? AllowedServerPorts = null,
[property: MemoryPackOrder(6)] AllowedPorts? AllowedRconPorts = null
) {
public static AgentConfiguration From(AgentInfo agentInfo) {
return new AgentConfiguration(agentInfo.AgentGuid, agentInfo.AgentName, agentInfo.ProtocolVersion, agentInfo.BuildVersion, agentInfo.MaxInstances, agentInfo.MaxMemory, agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
return new AgentConfiguration(agentInfo.AgentName, agentInfo.ProtocolVersion, agentInfo.BuildVersion, agentInfo.MaxInstances, agentInfo.MaxMemory, agentInfo.AllowedServerPorts, agentInfo.AllowedRconPorts);
}
}

View File

@ -5,11 +5,12 @@ namespace Phantom.Common.Data.Web.Instance;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record Instance(
[property: MemoryPackOrder(0)] InstanceConfiguration Configuration,
[property: MemoryPackOrder(1)] IInstanceStatus Status,
[property: MemoryPackOrder(2)] bool LaunchAutomatically
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration,
[property: MemoryPackOrder(2)] IInstanceStatus Status,
[property: MemoryPackOrder(3)] bool LaunchAutomatically
) {
public static Instance Offline(InstanceConfiguration configuration, bool launchAutomatically = false) {
return new Instance(configuration, InstanceStatus.Offline, launchAutomatically);
public static Instance Offline(Guid instanceGuid, InstanceConfiguration configuration, bool launchAutomatically = false) {
return new Instance(instanceGuid, configuration, InstanceStatus.Offline, launchAutomatically);
}
}

View File

@ -7,13 +7,12 @@ namespace Phantom.Common.Data.Instance;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record InstanceConfiguration(
[property: MemoryPackOrder(0)] Guid AgentGuid,
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] string InstanceName,
[property: MemoryPackOrder(3)] ushort ServerPort,
[property: MemoryPackOrder(4)] ushort RconPort,
[property: MemoryPackOrder(5)] string MinecraftVersion,
[property: MemoryPackOrder(6)] MinecraftServerKind MinecraftServerKind,
[property: MemoryPackOrder(7)] RamAllocationUnits MemoryAllocation,
[property: MemoryPackOrder(8)] Guid JavaRuntimeGuid,
[property: MemoryPackOrder(9)] ImmutableArray<string> JvmArguments
[property: MemoryPackOrder(1)] string InstanceName,
[property: MemoryPackOrder(2)] ushort ServerPort,
[property: MemoryPackOrder(3)] ushort RconPort,
[property: MemoryPackOrder(4)] string MinecraftVersion,
[property: MemoryPackOrder(5)] MinecraftServerKind MinecraftServerKind,
[property: MemoryPackOrder(6)] RamAllocationUnits MemoryAllocation,
[property: MemoryPackOrder(7)] Guid JavaRuntimeGuid,
[property: MemoryPackOrder(8)] ImmutableArray<string> JvmArguments
);

View File

@ -6,9 +6,10 @@ namespace Phantom.Common.Messages.Agent.ToAgent;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record ConfigureInstanceMessage(
[property: MemoryPackOrder(0)] InstanceConfiguration Configuration,
[property: MemoryPackOrder(1)] InstanceLaunchProperties LaunchProperties,
[property: MemoryPackOrder(2)] bool LaunchNow = false
[property: MemoryPackOrder(0)] Guid InstanceGuid,
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration,
[property: MemoryPackOrder(2)] InstanceLaunchProperties LaunchProperties,
[property: MemoryPackOrder(3)] bool LaunchNow = false
) : IMessageToAgent<InstanceActionResult<ConfigureInstanceResult>> {
public Task<InstanceActionResult<ConfigureInstanceResult>> Accept(IMessageToAgentListener listener) {
return listener.HandleConfigureInstance(this);

View File

@ -8,7 +8,8 @@ namespace Phantom.Common.Messages.Web.ToController;
[MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record CreateOrUpdateInstanceMessage(
[property: MemoryPackOrder(0)] Guid LoggedInUserGuid,
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration
[property: MemoryPackOrder(1)] Guid InstanceGuid,
[property: MemoryPackOrder(2)] InstanceConfiguration Configuration
) : IMessageToController<InstanceActionResult<CreateOrUpdateInstanceResult>> {
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> Accept(IMessageToControllerListener listener) {
return listener.HandleCreateOrUpdateInstance(this);

View File

@ -5,6 +5,8 @@ using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Minecraft;
@ -12,8 +14,10 @@ using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Actor.Mailbox;
using Phantom.Utils.Actor.Tasks;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Serilog;
@ -26,7 +30,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5);
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public readonly record struct Init(Guid AgentGuid, AgentConfiguration AgentConfiguration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new AgentActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
@ -37,6 +41,8 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private readonly Guid agentGuid;
private AgentConfiguration configuration;
private AgentStats? stats;
private ImmutableArray<TaggedJavaRuntime> javaRuntimes = ImmutableArray<TaggedJavaRuntime>.Empty;
@ -61,8 +67,8 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
}
private readonly ActorRef<AgentDatabaseStorageActor.ICommand> databaseStorageActor;
private readonly ActorRef<AgentInstanceRouterActor.ICommand> instanceRouterActor;
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
private readonly Dictionary<Guid, Instance> instanceDataByGuid = new ();
private AgentActor(Init init) {
@ -71,11 +77,11 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
this.configuration = init.Configuration;
this.connection = new AgentConnection(configuration.AgentGuid, configuration.AgentName);
this.agentGuid = init.AgentGuid;
this.configuration = init.AgentConfiguration;
this.connection = new AgentConnection(agentGuid, configuration.AgentName);
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(configuration.AgentGuid, dbProvider, cancellationToken)), "DatabaseStorage");
this.instanceRouterActor = Context.ActorOf(AgentInstanceRouterActor.Factory(new AgentInstanceRouterActor.Init(SelfTyped, connection, minecraftVersions, dbProvider, cancellationToken)), "InstanceRouter");
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(agentGuid, init.DbProvider, init.CancellationToken)), "DatabaseStorage");
NotifyAgentUpdated();
@ -86,12 +92,16 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
Receive<NotifyIsAliveCommand>(NotifyIsAlive);
Receive<UpdateStatsCommand>(UpdateStats);
Receive<UpdateJavaRuntimesCommand>(UpdateJavaRuntimes);
Receive<RouteToInstanceCommand>(RouteToInstance);
ReceiveAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
ReceiveAndReplyLater<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
ReceiveAndReplyLater<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
ReceiveAndReplyLater<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
Receive<ReceiveInstanceDataCommand>(ReceiveInstanceData);
}
private void NotifyAgentUpdated() {
controllerState.UpdateAgent(new Agent(configuration, stats, ConnectionStatus));
controllerState.UpdateAgent(new Agent(agentGuid, configuration, stats, ConnectionStatus));
}
protected override void PreStart() {
@ -100,22 +110,56 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
Context.System.Scheduler.ScheduleTellRepeatedly(DisconnectionRecheckInterval, DisconnectionRecheckInterval, Self, new RefreshConnectionStatusCommand(), Self);
}
private void CreateNewInstance(Instance instance) {
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
UpdateInstanceData(instance);
instanceRouterActor.Tell(new AgentInstanceRouterActor.InitializeInstanceCommand(instance));
var instanceActor = CreateInstanceActor(instance);
instanceActorByGuid.Add(instance.InstanceGuid, instanceActor);
return instanceActor;
}
private void UpdateInstanceData(Instance instance) {
instanceDataByGuid[instance.Configuration.InstanceGuid] = instance;
instanceDataByGuid[instance.InstanceGuid] = instance;
controllerState.UpdateInstance(instance);
}
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
var init = new InstanceActor.Init(instance, SelfTyped, connection, dbProvider, cancellationToken);
var name = "Instance:" + instance.InstanceGuid;
return Context.ActorOf(InstanceActor.Factory(init), name);
}
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
instance.Tell(command);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
}
}
private void TellAllInstances(InstanceActor.ICommand command) {
foreach (var instance in instanceActorByGuid.Values) {
instance.Tell(command);
}
}
private Task<InstanceActionResult<TReply>> RequestInstance<TCommand, TReply>(Guid instanceGuid, TCommand command) where TCommand : InstanceActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
return instance.Request(command, cancellationToken);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
return Task.FromResult(InstanceActionResult.General<TReply>(InstanceActionGeneralResult.InstanceDoesNotExist));
}
}
private async Task<ImmutableArray<ConfigureInstanceMessage>> PrepareInitialConfigurationMessages() {
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
foreach (var (instanceConfiguration, _, launchAutomatically) in instanceDataByGuid.Values.ToImmutableArray()) {
foreach (var (instanceGuid, instanceConfiguration, _, launchAutomatically) in instanceDataByGuid.Values.ToImmutableArray()) {
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
configurationMessages.Add(new ConfigureInstanceMessage(instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
configurationMessages.Add(new ConfigureInstanceMessage(instanceGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
}
return configurationMessages.ToImmutable();
@ -137,16 +181,23 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
public sealed record UpdateJavaRuntimesCommand(ImmutableArray<TaggedJavaRuntime> JavaRuntimes) : ICommand;
public sealed record RouteToInstanceCommand(AgentInstanceRouterActor.ICommand Command) : ICommand;
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, Guid InstanceGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendCommandToInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
public sealed record ReceiveInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead;
private async Task Initialize(InitializeCommand command) {
await using var ctx = dbProvider.Eager();
await foreach (var entity in ctx.Instances.Where(instance => instance.AgentGuid == configuration.AgentGuid).AsAsyncEnumerable().WithCancellation(cancellationToken)) {
await foreach (var entity in ctx.Instances.Where(instance => instance.AgentGuid == agentGuid).AsAsyncEnumerable().WithCancellation(cancellationToken)) {
var instanceConfiguration = new InstanceConfiguration(
entity.AgentGuid,
entity.InstanceGuid,
entity.InstanceName,
entity.ServerPort,
entity.RconPort,
@ -157,7 +208,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
JvmArgumentsHelper.Split(entity.JvmArguments)
);
CreateNewInstance(Instance.Offline(instanceConfiguration, entity.LaunchAutomatically));
CreateNewInstance(Instance.Offline(entity.InstanceGuid, instanceConfiguration, entity.LaunchAutomatically));
}
}
@ -171,9 +222,9 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
isOnline = true;
NotifyAgentUpdated();
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, agentGuid);
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentCommand(configuration.AgentName, configuration.ProtocolVersion, configuration.BuildVersion, configuration.MaxInstances, configuration.MaxMemory));
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentConfigurationCommand(configuration));
return configurationMessages;
}
@ -185,9 +236,9 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
isOnline = false;
NotifyAgentUpdated();
instanceRouterActor.Tell(new AgentInstanceRouterActor.MarkInstancesAsOfflineCommand());
TellAllInstances(new InstanceActor.SetStatusCommand(InstanceStatus.Offline));
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, agentGuid);
}
}
@ -196,7 +247,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
isOnline = false;
NotifyAgentUpdated();
Logger.Warning("Lost connection to agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
Logger.Warning("Lost connection to agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, agentGuid);
}
}
@ -216,11 +267,79 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private void UpdateJavaRuntimes(UpdateJavaRuntimesCommand command) {
javaRuntimes = command.JavaRuntimes;
controllerState.UpdateAgentJavaRuntimes(configuration.AgentGuid, javaRuntimes);
controllerState.UpdateAgentJavaRuntimes(agentGuid, javaRuntimes);
}
private void RouteToInstance(RouteToInstanceCommand command) {
instanceRouterActor.Forward(command.Command);
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command) {
var instanceConfiguration = command.Configuration;
if (string.IsNullOrWhiteSpace(instanceConfiguration.InstanceName)) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
}
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
}
return minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken)
.ContinueOnActor(CreateOrUpdateInstance1, command)
.Unwrap();
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance1(FileDownloadInfo? serverExecutableInfo, CreateOrUpdateInstanceCommand command) {
if (serverExecutableInfo == null) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
}
var instanceConfiguration = command.Configuration;
bool isCreatingInstance = !instanceActorByGuid.TryGetValue(command.InstanceGuid, out var instanceActorRef);
if (isCreatingInstance) {
instanceActorRef = CreateNewInstance(Instance.Offline(command.InstanceGuid, instanceConfiguration));
}
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, command.InstanceGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isCreatingInstance);
return instanceActorRef.Request(configureInstanceCommand, cancellationToken)
.ContinueOnActor(CreateOrUpdateInstance2, configureInstanceCommand);
}
private InstanceActionResult<CreateOrUpdateInstanceResult> CreateOrUpdateInstance2(InstanceActionResult<ConfigureInstanceResult> result, InstanceActor.ConfigureInstanceCommand command) {
var instanceGuid = command.InstanceGuid;
var instanceName = command.Configuration.InstanceName;
var isCreating = command.IsCreatingInstance;
if (result.Is(ConfigureInstanceResult.Success)) {
string action = isCreating ? "Added" : "Edited";
string relation = isCreating ? "to agent" : "in agent";
Logger.Information(action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\".", instanceName, instanceGuid, configuration.AgentName);
}
else {
string action = isCreating ? "adding" : "editing";
string relation = isCreating ? "to agent" : "in agent";
Logger.Information("Failed " + action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, configuration.AgentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
return result.Map(static result => result switch {
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
_ => CreateOrUpdateInstanceResult.UnknownError
});
}
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
}
private Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
return RequestInstance<InstanceActor.LaunchInstanceCommand, LaunchInstanceResult>(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
}
private Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
return RequestInstance<InstanceActor.StopInstanceCommand, StopInstanceResult>(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
}
private Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
return RequestInstance<InstanceActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(command.InstanceGuid, new InstanceActor.SendCommandToInstanceCommand(command.AuditLogUserGuid, command.Command));
}
private void ReceiveInstanceData(ReceiveInstanceDataCommand command) {

View File

@ -1,4 +1,4 @@
using Phantom.Common.Data;
using Phantom.Common.Data.Web.Agent;
using Phantom.Controller.Database;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
@ -19,7 +19,7 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private StoreAgentCommand? lastStoreCommand;
private AgentConfiguration? configurationToStore;
private bool hasScheduledFlush;
private AgentDatabaseStorageActor(Init init) {
@ -27,25 +27,25 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
Receive<StoreAgentCommand>(StoreAgent);
Receive<StoreAgentConfigurationCommand>(StoreAgentConfiguration);
ReceiveAsync<FlushChangesCommand>(FlushChanges);
}
public interface ICommand {}
public sealed record StoreAgentCommand(string Name, ushort ProtocolVersion, string BuildVersion, ushort MaxInstances, RamAllocationUnits MaxMemory) : ICommand;
public sealed record StoreAgentConfigurationCommand(AgentConfiguration Configuration) : ICommand;
private sealed record FlushChangesCommand : ICommand;
private void StoreAgent(StoreAgentCommand command) {
this.lastStoreCommand = command;
private void StoreAgentConfiguration(StoreAgentConfigurationCommand command) {
this.configurationToStore = command.Configuration;
ScheduleFlush(TimeSpan.FromSeconds(2));
}
private async Task FlushChanges(FlushChangesCommand command) {
hasScheduledFlush = false;
if (lastStoreCommand == null) {
if (configurationToStore == null) {
return;
}
@ -53,22 +53,22 @@ sealed class AgentDatabaseStorageActor : ReceiveActor<AgentDatabaseStorageActor.
await using var ctx = dbProvider.Eager();
var entity = ctx.AgentUpsert.Fetch(agentGuid);
entity.Name = lastStoreCommand.Name;
entity.ProtocolVersion = lastStoreCommand.ProtocolVersion;
entity.BuildVersion = lastStoreCommand.BuildVersion;
entity.MaxInstances = lastStoreCommand.MaxInstances;
entity.MaxMemory = lastStoreCommand.MaxMemory;
entity.Name = configurationToStore.AgentName;
entity.ProtocolVersion = configurationToStore.ProtocolVersion;
entity.BuildVersion = configurationToStore.BuildVersion;
entity.MaxInstances = configurationToStore.MaxInstances;
entity.MaxMemory = configurationToStore.MaxMemory;
await ctx.SaveChangesAsync(cancellationToken);
} catch (Exception e) {
ScheduleFlush(TimeSpan.FromSeconds(10));
Logger.Error(e, "Could not store agent \"{AgentName}\" (GUID {AgentGuid}) to database.", lastStoreCommand.Name, agentGuid);
Logger.Error(e, "Could not store agent \"{AgentName}\" (GUID {AgentGuid}) in database.", configurationToStore.AgentName, agentGuid);
return;
}
Logger.Information("Stored agent \"{AgentName}\" (GUID {AgentGuid}) to database.", lastStoreCommand.Name, agentGuid);
Logger.Information("Stored agent \"{AgentName}\" (GUID {AgentGuid}) in database.", configurationToStore.AgentName, agentGuid);
lastStoreCommand = null;
configurationToStore = null;
}
private void ScheduleFlush(TimeSpan delay) {

View File

@ -1,188 +0,0 @@
using Phantom.Common.Data;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance;
using Phantom.Controller.Database;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Actor.Mailbox;
using Phantom.Utils.Actor.Tasks;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Agents;
sealed class AgentInstanceRouterActor : ReceiveActor<AgentInstanceRouterActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<AgentInstanceRouterActor>();
public readonly record struct Init(ActorRef<AgentActor.ICommand> AgentActorRef, AgentConnection AgentConnection, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new AgentInstanceRouterActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
}
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
private readonly AgentConnection agentConnection;
private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
private AgentInstanceRouterActor(Init init) {
this.agentActorRef = init.AgentActorRef;
this.agentConnection = init.AgentConnection;
this.minecraftVersions = init.MinecraftVersions;
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
Receive<InitializeInstanceCommand>(InitializeInstance);
Receive<MarkInstancesAsOfflineCommand>(MarkInstancesAsOffline);
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
ReceiveAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
ReceiveAndReplyLater<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
ReceiveAndReplyLater<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
ReceiveAndReplyLater<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
}
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
var instanceGuid = instance.Configuration.InstanceGuid;
if (instanceActorByGuid.ContainsKey(instanceGuid)) {
throw new InvalidOperationException("Instance already exists: " + instanceGuid);
}
var instanceActor = CreateInstanceActor(instance);
instanceActorByGuid.Add(instanceGuid, instanceActor);
return instanceActor;
}
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
var init = new InstanceActor.Init(instance, agentActorRef, agentConnection, dbProvider, cancellationToken);
var name = "Instance:" + instance.Configuration.InstanceGuid;
return Context.ActorOf(InstanceActor.Factory(init), name);
}
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
instance.Tell(command);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
}
}
private void TellAllInstances(InstanceActor.ICommand command) {
foreach (var instance in instanceActorByGuid.Values) {
instance.Tell(command);
}
}
private Task<InstanceActionResult<TReply>> RequestInstance<TCommand, TReply>(Guid instanceGuid, TCommand command) where TCommand : InstanceActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
return instance.Request(command, cancellationToken);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
return Task.FromResult(InstanceActionResult.General<TReply>(InstanceActionGeneralResult.InstanceDoesNotExist));
}
}
public interface ICommand {}
public sealed record InitializeInstanceCommand(Instance Instance) : ICommand;
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
public sealed record MarkInstancesAsOfflineCommand : ICommand;
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendCommandToInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
private void InitializeInstance(InitializeInstanceCommand command) {
CreateNewInstance(command.Instance);
}
private void MarkInstancesAsOffline(MarkInstancesAsOfflineCommand command) {
TellAllInstances(new InstanceActor.SetStatusCommand(InstanceStatus.Offline));
}
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command) {
var instanceConfiguration = command.Configuration;
if (string.IsNullOrWhiteSpace(instanceConfiguration.InstanceName)) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
}
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
}
return minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken)
.ContinueOnActor(CreateOrUpdateInstance1, command)
.Unwrap();
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance1(FileDownloadInfo? serverExecutableInfo, CreateOrUpdateInstanceCommand command) {
if (serverExecutableInfo == null) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
}
var instanceConfiguration = command.Configuration;
bool isCreatingInstance = !instanceActorByGuid.TryGetValue(instanceConfiguration.InstanceGuid, out var instanceActorRef);
if (isCreatingInstance) {
instanceActorRef = CreateNewInstance(Instance.Offline(instanceConfiguration));
}
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isCreatingInstance);
return instanceActorRef.Request(configureInstanceCommand, cancellationToken)
.ContinueOnActor(CreateOrUpdateInstance2, configureInstanceCommand);
}
private InstanceActionResult<CreateOrUpdateInstanceResult> CreateOrUpdateInstance2(InstanceActionResult<ConfigureInstanceResult> result, InstanceActor.ConfigureInstanceCommand command) {
var instanceName = command.Configuration.InstanceName;
var instanceGuid = command.Configuration.InstanceGuid;
var isCreating = command.IsCreatingInstance;
if (result.Is(ConfigureInstanceResult.Success)) {
string action = isCreating ? "Added" : "Edited";
string relation = isCreating ? "to agent" : "in agent";
Logger.Information(action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\".", instanceName, instanceGuid, configuration.AgentName);
}
else {
string action = isCreating ? "adding" : "editing";
string relation = isCreating ? "to agent" : "in agent";
Logger.Information("Failed " + action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, configuration.AgentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
return result.Map(static result => result switch {
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
_ => CreateOrUpdateInstanceResult.UnknownError
});
}
private Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
return RequestInstance<InstanceActor.LaunchInstanceCommand, LaunchInstanceResult>(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
}
private Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
return RequestInstance<InstanceActor.StopInstanceCommand, StopInstanceResult>(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
}
private Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
return RequestInstance<InstanceActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(command.InstanceGuid, new InstanceActor.SendCommandToInstanceCommand(command.AuditLogUserGuid, command.Command));
}
}

View File

@ -36,12 +36,12 @@ sealed class AgentManager {
this.dbProvider = dbProvider;
this.cancellationToken = cancellationToken;
addAgentActorFactory = (_, agent) => CreateAgentActor(agent);
this.addAgentActorFactory = CreateAgentActor;
}
private ActorRef<AgentActor.ICommand> CreateAgentActor(AgentConfiguration agentConfiguration) {
var init = new AgentActor.Init(agentConfiguration, controllerState, minecraftVersions, dbProvider, cancellationToken);
var name = "Agent:" + agentConfiguration.AgentGuid;
private ActorRef<AgentActor.ICommand> CreateAgentActor(Guid agentGuid, AgentConfiguration agentConfiguration) {
var init = new AgentActor.Init(agentGuid, agentConfiguration, controllerState, minecraftVersions, dbProvider, cancellationToken);
var name = "Agent:" + agentGuid;
return actorSystem.ActorOf(AgentActor.Factory(init), name);
}
@ -49,10 +49,11 @@ sealed class AgentManager {
await using var ctx = dbProvider.Eager();
await foreach (var entity in ctx.Agents.AsAsyncEnumerable().WithCancellation(cancellationToken)) {
var agentProperties = new AgentConfiguration(entity.AgentGuid, entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory);
var agentGuid = entity.AgentGuid;
var agentConfiguration = new AgentConfiguration(entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory);
if (agentsByGuid.TryAdd(entity.AgentGuid, CreateAgentActor(agentProperties))) {
Logger.Information("Loaded agent \"{AgentName}\" (GUID {AgentGuid}) from database.", agentProperties.AgentName, agentProperties.AgentGuid);
if (agentsByGuid.TryAdd(agentGuid, CreateAgentActor(agentGuid, agentConfiguration))) {
Logger.Information("Loaded agent \"{AgentName}\" (GUID {AgentGuid}) from database.", agentConfiguration.AgentName, agentGuid);
}
}
}
@ -82,9 +83,9 @@ sealed class AgentManager {
}
}
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentInstanceRouterActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
return await agent.Request(new AgentActor.RouteToInstanceCommand(command), cancellationToken);
return await agent.Request(command, cancellationToken);
}
else {
return InstanceActionResult.General<TReply>(InstanceActionGeneralResult.AgentDoesNotExist);

View File

@ -17,6 +17,8 @@ using Phantom.Utils.Tasks;
namespace Phantom.Controller.Services;
public sealed class ControllerServices : IAsyncDisposable {
public ActorSystem ActorSystem { get; }
private TaskManager TaskManager { get; }
private ControllerState ControllerState { get; }
private MinecraftVersions MinecraftVersions { get; }
@ -33,7 +35,6 @@ public sealed class ControllerServices : IAsyncDisposable {
private UserLoginManager UserLoginManager { get; }
private AuditLogManager AuditLogManager { get; }
private readonly ActorSystem actorSystem;
private readonly IDbContextProvider dbProvider;
private readonly AuthToken webAuthToken;
private readonly CancellationToken cancellationToken;
@ -43,13 +44,13 @@ public sealed class ControllerServices : IAsyncDisposable {
this.webAuthToken = webAuthToken;
this.cancellationToken = shutdownCancellationToken;
this.actorSystem = ActorSystemFactory.Create("Controller");
this.ActorSystem = ActorSystemFactory.Create("Controller");
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
this.ControllerState = new ControllerState();
this.MinecraftVersions = new MinecraftVersions();
this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
this.InstanceLogManager = new InstanceLogManager();
this.UserManager = new UserManager(dbProvider);
@ -67,7 +68,7 @@ public sealed class ControllerServices : IAsyncDisposable {
}
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
return new WebMessageListener(ActorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
}
public async Task Initialize() {
@ -78,7 +79,7 @@ public sealed class ControllerServices : IAsyncDisposable {
}
public async ValueTask DisposeAsync() {
await actorSystem.Terminate();
actorSystem.Dispose();
await ActorSystem.Terminate();
ActorSystem.Dispose();
}
}

View File

@ -20,7 +20,7 @@ sealed class ControllerState {
public ObservableState<ImmutableDictionary<Guid, Instance>>.Receiver InstancesByGuidReceiver => instancesByGuid.ReceiverSide;
public void UpdateAgent(Agent agent) {
agentsByGuid.PublisherSide.Publish(static (agentsByGuid, agent) => agentsByGuid.SetItem(agent.Configuration.AgentGuid, agent), agent);
agentsByGuid.PublisherSide.Publish(static (agentsByGuid, agent) => agentsByGuid.SetItem(agent.AgentGuid, agent), agent);
}
public void UpdateAgentJavaRuntimes(Guid agentGuid, ImmutableArray<TaggedJavaRuntime> runtimes) {
@ -28,6 +28,6 @@ sealed class ControllerState {
}
public void UpdateInstance(Instance instance) {
instancesByGuid.PublisherSide.Publish(static (instancesByGuid, instance) => instancesByGuid.SetItem(instance.Configuration.InstanceGuid, instance), instance);
instancesByGuid.PublisherSide.Publish(static (instancesByGuid, instance) => instancesByGuid.SetItem(instance.InstanceGuid, instance), instance);
}
}

View File

@ -2,12 +2,9 @@
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Minecraft;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Entities;
using Phantom.Controller.Database.Repositories;
using Phantom.Controller.Services.Agents;
using Phantom.Utils.Actor;
@ -22,25 +19,24 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
private readonly AgentConnection agentConnection;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private Guid InstanceGuid => configuration.InstanceGuid;
private readonly Guid instanceGuid;
private InstanceConfiguration configuration;
private IInstanceStatus status;
private bool launchAutomatically;
private readonly ActorRef<InstanceDatabaseStorageActor.ICommand> databaseStorageActor;
private InstanceActor(Init init) {
this.agentActorRef = init.AgentActorRef;
this.agentConnection = init.AgentConnection;
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
var instance = init.Instance;
this.configuration = instance.Configuration;
this.status = instance.Status;
this.launchAutomatically = instance.LaunchAutomatically;
(this.instanceGuid, this.configuration, this.status, this.launchAutomatically) = init.Instance;
this.databaseStorageActor = Context.ActorOf(InstanceDatabaseStorageActor.Factory(new InstanceDatabaseStorageActor.Init(instanceGuid, init.DbProvider, init.CancellationToken)), "DatabaseStorage");
Receive<SetStatusCommand>(SetStatus);
ReceiveAsyncAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance);
@ -50,8 +46,14 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
}
private void NotifyInstanceUpdated() {
var instance = new Instance(configuration, status, launchAutomatically);
agentActorRef.Tell(new AgentActor.ReceiveInstanceDataCommand(instance));
agentActorRef.Tell(new AgentActor.ReceiveInstanceDataCommand(new Instance(instanceGuid, configuration, status, launchAutomatically)));
}
private void SetLaunchAutomatically(bool newValue) {
if (launchAutomatically != newValue) {
launchAutomatically = newValue;
NotifyInstanceUpdated();
}
}
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
@ -63,7 +65,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
public sealed record SetStatusCommand(IInstanceStatus Status) : ICommand;
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsCreatingInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, Guid InstanceGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsCreatingInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
public sealed record LaunchInstanceCommand(Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
@ -77,87 +79,55 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
}
private async Task<InstanceActionResult<ConfigureInstanceResult>> ConfigureInstance(ConfigureInstanceCommand command) {
var message = new ConfigureInstanceMessage(command.Configuration, command.LaunchProperties);
var message = new ConfigureInstanceMessage(command.InstanceGuid, command.Configuration, command.LaunchProperties);
var result = await SendInstanceActionMessage<ConfigureInstanceMessage, ConfigureInstanceResult>(message);
if (result.Is(ConfigureInstanceResult.Success)) {
configuration = command.Configuration;
NotifyInstanceUpdated();
await using var db = dbProvider.Lazy();
var storeCommand = new InstanceDatabaseStorageActor.StoreInstanceConfigurationCommand(
command.AuditLogUserGuid,
command.IsCreatingInstance,
configuration
);
InstanceEntity entity = db.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
entity.AgentGuid = configuration.AgentGuid;
entity.InstanceName = configuration.InstanceName;
entity.ServerPort = configuration.ServerPort;
entity.RconPort = configuration.RconPort;
entity.MinecraftVersion = configuration.MinecraftVersion;
entity.MinecraftServerKind = configuration.MinecraftServerKind;
entity.MemoryAllocation = configuration.MemoryAllocation;
entity.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
if (command.IsCreatingInstance) {
auditLogWriter.InstanceCreated(configuration.InstanceGuid);
}
else {
auditLogWriter.InstanceEdited(configuration.InstanceGuid);
}
await db.Ctx.SaveChangesAsync(cancellationToken);
databaseStorageActor.Tell(storeCommand);
}
return result;
}
private async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
var message = new LaunchInstanceMessage(InstanceGuid);
var message = new LaunchInstanceMessage(instanceGuid);
var result = await SendInstanceActionMessage<LaunchInstanceMessage, LaunchInstanceResult>(message);
if (result.Is(LaunchInstanceResult.LaunchInitiated)) {
await HandleInstanceManuallyLaunchedOrStopped(true, command.AuditLogUserGuid, auditLogWriter => auditLogWriter.InstanceLaunched(InstanceGuid));
SetLaunchAutomatically(true);
databaseStorageActor.Tell(new InstanceDatabaseStorageActor.StoreInstanceLaunchedCommand(command.AuditLogUserGuid));
}
return result;
}
private async Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
var message = new StopInstanceMessage(InstanceGuid, command.StopStrategy);
var message = new StopInstanceMessage(instanceGuid, command.StopStrategy);
var result = await SendInstanceActionMessage<StopInstanceMessage, StopInstanceResult>(message);
if (result.Is(StopInstanceResult.StopInitiated)) {
await HandleInstanceManuallyLaunchedOrStopped(false, command.AuditLogUserGuid, auditLogWriter => auditLogWriter.InstanceStopped(InstanceGuid, command.StopStrategy.Seconds));
SetLaunchAutomatically(false);
databaseStorageActor.Tell(new InstanceDatabaseStorageActor.StoreInstanceStoppedCommand(command.AuditLogUserGuid, command.StopStrategy));
}
return result;
}
private async Task HandleInstanceManuallyLaunchedOrStopped(bool wasLaunched, Guid auditLogUserGuid, Action<AuditLogRepository.ItemWriter> addAuditEvent) {
if (launchAutomatically != wasLaunched) {
launchAutomatically = wasLaunched;
NotifyInstanceUpdated();
}
await using var db = dbProvider.Lazy();
var entity = await db.Ctx.Instances.FindAsync(new object[] { InstanceGuid }, cancellationToken);
if (entity != null) {
entity.LaunchAutomatically = wasLaunched;
addAuditEvent(new AuditLogRepository(db).Writer(auditLogUserGuid));
await db.Ctx.SaveChangesAsync(cancellationToken);
}
}
private async Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
var message = new SendCommandToInstanceMessage(InstanceGuid, command.Command);
var message = new SendCommandToInstanceMessage(instanceGuid, command.Command);
var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(message);
if (result.Is(SendCommandToInstanceResult.Success)) {
await using var db = dbProvider.Lazy();
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
auditLogWriter.InstanceCommandExecuted(InstanceGuid, command.Command);
await db.Ctx.SaveChangesAsync(cancellationToken);
databaseStorageActor.Tell(new InstanceDatabaseStorageActor.StoreInstanceCommandSentCommand(command.AuditLogUserGuid, command.Command));
}
return result;

View File

@ -1,9 +1,116 @@
using Phantom.Utils.Actor;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Web.Minecraft;
using Phantom.Controller.Database;
using Phantom.Controller.Database.Entities;
using Phantom.Controller.Database.Repositories;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Instances;
sealed class InstanceDatabaseStorageActor : ReceiveActor<InstanceDatabaseStorageActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<InstanceDatabaseStorageActor>();
public readonly record struct Init(Guid InstanceGuid, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new InstanceDatabaseStorageActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
}
private readonly Guid instanceGuid;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private InstanceDatabaseStorageActor(Init init) {
this.instanceGuid = init.InstanceGuid;
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
ReceiveAsync<StoreInstanceConfigurationCommand>(StoreInstanceConfiguration);
ReceiveAsync<StoreInstanceLaunchedCommand>(StoreInstanceLaunched);
ReceiveAsync<StoreInstanceStoppedCommand>(StoreInstanceStopped);
ReceiveAsync<StoreInstanceCommandSentCommand>(StoreInstanceCommandSent);
}
private ValueTask<InstanceEntity?> FindInstanceEntity(ILazyDbContext db) {
return db.Ctx.Instances.FindAsync(new object[] { instanceGuid }, cancellationToken);
}
public interface ICommand {}
public sealed record StoreInstanceCommand() : ICommand;
public sealed record StoreInstanceConfigurationCommand(Guid AuditLogUserGuid, bool IsCreatingInstance, InstanceConfiguration Configuration) : ICommand;
public sealed record StoreInstanceLaunchedCommand(Guid AuditLogUserGuid) : ICommand;
public sealed record StoreInstanceStoppedCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand;
public sealed record StoreInstanceCommandSentCommand(Guid AuditLogUserGuid, string Command) : ICommand;
private async Task StoreInstanceConfiguration(StoreInstanceConfigurationCommand command) {
var configuration = command.Configuration;
await using (var db = dbProvider.Lazy()) {
InstanceEntity entity = db.Ctx.InstanceUpsert.Fetch(instanceGuid);
entity.AgentGuid = configuration.AgentGuid;
entity.InstanceName = configuration.InstanceName;
entity.ServerPort = configuration.ServerPort;
entity.RconPort = configuration.RconPort;
entity.MinecraftVersion = configuration.MinecraftVersion;
entity.MinecraftServerKind = configuration.MinecraftServerKind;
entity.MemoryAllocation = configuration.MemoryAllocation;
entity.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
if (command.IsCreatingInstance) {
auditLogWriter.InstanceCreated(instanceGuid);
}
else {
auditLogWriter.InstanceEdited(instanceGuid);
}
await db.Ctx.SaveChangesAsync(cancellationToken);
}
Logger.Information("Stored instance \"{InstanceName}\" (GUID {InstanceGuid}) in database.", configuration.InstanceName, instanceGuid);
}
private async Task StoreInstanceLaunched(StoreInstanceLaunchedCommand command) {
await using var db = dbProvider.Lazy();
var entity = await FindInstanceEntity(db);
if (entity != null) {
entity.LaunchAutomatically = true;
}
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
auditLogWriter.InstanceLaunched(instanceGuid);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
private async Task StoreInstanceStopped(StoreInstanceStoppedCommand command) {
await using var db = dbProvider.Lazy();
var entity = await FindInstanceEntity(db);
if (entity != null) {
entity.LaunchAutomatically = false;
}
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
auditLogWriter.InstanceStopped(instanceGuid, command.StopStrategy.Seconds);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
private async Task StoreInstanceCommandSent(StoreInstanceCommandSentCommand command) {
await using var db = dbProvider.Lazy();
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
auditLogWriter.InstanceCommandExecuted(instanceGuid, command.Command);
await db.Ctx.SaveChangesAsync(cancellationToken);
}
}

View File

@ -187,7 +187,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
}
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> HandleCreateOrUpdateInstance(CreateOrUpdateInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.Configuration));
return agentManager.DoInstanceAction<AgentActor.CreateOrUpdateInstanceCommand, CreateOrUpdateInstanceResult>(message.Configuration.AgentGuid, new AgentActor.CreateOrUpdateInstanceCommand(message.LoggedInUserGuid, message.InstanceGuid, message.Configuration));
}
public Task<InstanceActionResult<LaunchInstanceResult>> HandleLaunchInstance(LaunchInstanceMessage message) {

View File

@ -55,24 +55,23 @@ try {
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) {
await using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
await controllerServices.Initialize();
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
}
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
try {
await Task.WhenAll(
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, controllerServices.ActorSystem, shutdownCancellationToken),
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, controllerServices.ActorSystem, shutdownCancellationToken)
);
} finally {
await rpcTaskManager.Stop();
NetMQConfig.Cleanup();
}
}
return 0;
} catch (OperationCanceledException) {

View File

@ -5,6 +5,7 @@ namespace Phantom.Utils.Actor;
public readonly struct ActorConfiguration {
public SupervisorStrategy? SupervisorStrategy { get; init; }
public string? MailboxType { get; init; }
public int? StashCapacity { get; init; }
internal Props Apply(Props props) {
if (SupervisorStrategy != null) {
@ -15,6 +16,10 @@ public readonly struct ActorConfiguration {
props = props.WithMailbox(MailboxType);
}
if (StashCapacity != null) {
props = props.WithStashCapacity(StashCapacity.Value);
}
return props;
}
}

View File

@ -28,4 +28,8 @@ public readonly struct ActorRef<TMessage> {
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
return Request(message, timeout: null, cancellationToken);
}
public Task<bool> Stop(TimeSpan? timeout = null) {
return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan);
}
}

View File

@ -12,6 +12,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
</ItemGroup>

View File

@ -2,6 +2,7 @@
namespace Phantom.Utils.Rpc;
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
public string LoggerName => "Rpc:" + ServiceName;
public string TcpUrl => "tcp://" + Host + ":" + Port;
}

View File

@ -0,0 +1,64 @@
using Akka.Actor;
using Akka.Event;
using Phantom.Utils.Actor;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Runtime;
namespace Phantom.Utils.Rpc;
sealed class RpcReceiverActor<TClientListener, TServerListener, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand>, IWithStash where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
public readonly record struct Init(IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions, MessageHandler<TServerListener> MessageHandler, RpcConnectionToClient<TClientListener> Connection);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>(init), new ActorConfiguration {
SupervisorStrategy = SupervisorStrategies.Resume,
StashCapacity = 100
});
}
public IStash Stash { get; set; } = null!;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly MessageHandler<TServerListener> messageHandler;
private readonly RpcConnectionToClient<TClientListener> connection;
private RpcReceiverActor(Init init) {
this.messageDefinitions = init.MessageDefinitions;
this.messageHandler = init.MessageHandler;
this.connection = init.Connection;
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
}
public interface ICommand {}
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data) : ICommand;
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
if (messageDefinitions.IsRegistrationMessage(command.MessageType)) {
Handle(command.Data);
if (await connection.GetAuthorization()) {
Stash.UnstashAll();
Become(() => {
Receive<ReceiveMessageCommand>(ReceiveMessageAuthorized);
});
}
}
else if (Stash.IsFull) {
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
}
else {
Stash.Stash();
}
}
private void ReceiveMessageAuthorized(ReceiveMessageCommand command) {
Handle(command.Data);
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, messageHandler);
}
}

View File

@ -1,5 +1,7 @@
using System.Collections.Concurrent;
using Akka.Actor;
using NetMQ.Sockets;
using Phantom.Utils.Actor;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Rpc.Sockets;
@ -9,25 +11,29 @@ using Serilog.Events;
namespace Phantom.Utils.Rpc.Runtime;
public static class RpcServerRuntime {
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, actorSystem, cancellationToken);
}
}
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) {
var socket = RpcServerSocket.Connect(config);
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, actorSystem, cancellationToken).Launch();
}
private readonly string serviceName;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
private readonly ActorSystem actorSystem;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) : base(socket) {
this.serviceName = socket.Config.ServiceName;
this.messageDefinitions = messageDefinitions;
this.listenerFactory = listenerFactory;
this.actorSystem = actorSystem;
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
this.cancellationToken = cancellationToken;
}
@ -62,19 +68,18 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
}
var clientLoggerName = LoggerName + ":" + routingId;
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
connection.Closed += OnConnectionClosed;
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager);
var clientActorName = "RpcReceive-" + serviceName + "-" + routingId;
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, listenerFactory(connection), taskManager);
clients[routingId] = client;
client.EnqueueRegistrationMessage(messageType, data);
}
else {
client.Enqueue(messageType, data);
}
}
foreach (var client in clients.Values) {
client.Connection.Close();
@ -94,27 +99,22 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
private sealed class Client : MessageHandler<TServerListener> {
public RpcConnectionToClient<TClientListener> Connection { get; }
private readonly RpcQueue processingQueue;
private readonly ActorRef<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand> receiverActor;
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
private readonly TaskManager taskManager;
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientListener> connection, ActorSystem actorSystem, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
this.Connection = connection;
this.Connection.Closed += OnConnectionClosed;
this.processingQueue = processingQueue;
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Factory(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Init(messageDefinitions, this, Connection)), actorName);
this.messageDefinitions = messageDefinitions;
this.taskManager = taskManager;
}
internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => Handle(data));
}
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
LogMessageType(messageType, data);
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data));
receiverActor.Tell(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ReceiveMessageCommand(messageType, data));
}
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
@ -123,19 +123,6 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
}
}
private void Handle(ReadOnlyMemory<byte> data) {
messageDefinitions.ToServer.Handle(data, this);
}
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
if (await Connection.GetAuthorization()) {
Handle(data);
}
else {
Logger.Warning("Dropped message after failed registration.");
}
}
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
}
@ -147,7 +134,7 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
await StopReceiving();
await processingQueue.Stop();
await receiverActor.Stop();
await Connection.StopSending();
Logger.Debug("Connection closed.");
});

View File

@ -19,6 +19,6 @@ public sealed class AgentManager {
}
public ImmutableDictionary<Guid, Agent> ToDictionaryByGuid() {
return agents.Value.ToImmutableDictionary(static agent => agent.Configuration.AgentGuid);
return agents.Value.ToImmutableDictionary(static agent => agent.AgentGuid);
}
}

View File

@ -23,7 +23,7 @@ public sealed class InstanceManager {
public EventSubscribers<InstanceDictionary> InstancesChanged => instances.Subs;
internal void RefreshInstances(ImmutableArray<Instance> newInstances) {
instances.SetTo(newInstances.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid));
instances.SetTo(newInstances.ToImmutableDictionary(static instance => instance.InstanceGuid));
}
public InstanceDictionary GetAll() {
@ -34,8 +34,8 @@ public sealed class InstanceManager {
return instances.Value.GetValueOrDefault(instanceGuid);
}
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(Guid loggedInUserGuid, InstanceConfiguration configuration, CancellationToken cancellationToken) {
var message = new CreateOrUpdateInstanceMessage(loggedInUserGuid, configuration);
public Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(Guid loggedInUserGuid, Guid instanceGuid, InstanceConfiguration configuration, CancellationToken cancellationToken) {
var message = new CreateOrUpdateInstanceMessage(loggedInUserGuid, instanceGuid, configuration);
return controllerConnection.Send<CreateOrUpdateInstanceMessage, InstanceActionResult<CreateOrUpdateInstanceResult>>(message, cancellationToken);
}

View File

@ -24,7 +24,7 @@
}
<Cell>
<p class="fw-semibold">@configuration.AgentName</p>
<small class="font-monospace text-uppercase">@configuration.AgentGuid.ToString()</small>
<small class="font-monospace text-uppercase">@agent.AgentGuid.ToString()</small>
</Cell>
<Cell class="text-end">
<ProgressBar Value="@(usedInstances ?? 0)" Maximum="@configuration.MaxInstances">
@ -73,7 +73,7 @@
protected override void OnInitialized() {
AgentManager.AgentsChanged.Subscribe(this, agents => {
var sortedAgents = agents.Sort(static (a1, a2) => a1.Configuration.AgentName.CompareTo(a2.Configuration.AgentName));
agentTable.UpdateFrom(sortedAgents, static agent => agent.Configuration.AgentGuid, static agent => agent, static (agent, _) => agent);
agentTable.UpdateFrom(sortedAgents, static agent => agent.AgentGuid, static agent => agent, static (agent, _) => agent);
InvokeAsync(StateHasChanged);
});
}

View File

@ -58,7 +58,7 @@
try {
logItems = await AuditLogManager.GetMostRecentItems(50, cancellationToken);
userNamesByGuid = (await UserManager.GetAll(cancellationToken)).ToImmutableDictionary(static user => user.Guid, static user => user.Name);
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid, static instance => instance.Configuration.InstanceName);
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.InstanceGuid, static instance => instance.Configuration.InstanceName);
} finally {
initializationCancellationTokenSource.Dispose();
}

View File

@ -61,8 +61,8 @@
try {
logItems = await EventLogManager.GetMostRecentItems(50, cancellationToken);
agentNamesByGuid = AgentManager.GetAll().Select(static agent => agent.Configuration).ToImmutableDictionary(static kvp => kvp.AgentGuid, static kvp => kvp.AgentName);
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.Configuration.InstanceGuid, static instance => instance.Configuration.InstanceName);
agentNamesByGuid = AgentManager.GetAll().ToImmutableDictionary(static kvp => kvp.AgentGuid, static kvp => kvp.Configuration.AgentName);
instanceNamesByGuid = InstanceManager.GetAll().Values.ToImmutableDictionary(static instance => instance.InstanceGuid, static instance => instance.Configuration.InstanceName);
} finally {
initializationCancellationTokenSource.Dispose();
}

View File

@ -3,4 +3,4 @@
@attribute [Authorize(Permission.CreateInstancesPolicy)]
<h1>New Instance</h1>
<InstanceAddOrEditForm EditedInstanceConfiguration="null" />
<InstanceAddOrEditForm EditedInstance="null" />

View File

@ -1,18 +1,18 @@
@page "/instances/{InstanceGuid:guid}/edit"
@attribute [Authorize(Permission.CreateInstancesPolicy)]
@using Phantom.Common.Data.Instance
@using Phantom.Common.Data.Web.Instance
@using Phantom.Common.Data.Web.Users
@using Phantom.Web.Services.Instances
@inherits Phantom.Web.Components.PhantomComponent
@inherits PhantomComponent
@inject InstanceManager InstanceManager
@if (InstanceConfiguration == null) {
@if (Instance == null) {
<h1>Instance Not Found</h1>
<p>Return to <a href="instances">all instances</a>.</p>
}
else {
<h1>Edit Instance: @InstanceConfiguration.InstanceName</h1>
<InstanceAddOrEditForm EditedInstanceConfiguration="InstanceConfiguration" />
<h1>Edit Instance: @Instance.Configuration.InstanceName</h1>
<InstanceAddOrEditForm EditedInstance="Instance" />
}
@code {
@ -20,10 +20,10 @@ else {
[Parameter]
public Guid InstanceGuid { get; init; }
private InstanceConfiguration? InstanceConfiguration { get; set; }
private Instance? Instance { get; set; }
protected override void OnInitialized() {
InstanceConfiguration = InstanceManager.GetByGuid(InstanceGuid)?.Configuration;
Instance = InstanceManager.GetByGuid(InstanceGuid);
}
}

View File

@ -16,7 +16,7 @@
<a href="instances/create" class="btn btn-primary" role="button">New Instance</a>
</PermissionView>
<Table TItem="Instance" Items="instances" ItemUrl="@(static instance => "instances/" + instance.Configuration.InstanceGuid)">
<Table TItem="Instance" Items="instances" ItemUrl="@(static instance => "instances/" + instance.InstanceGuid)">
<HeaderRow>
<Column Width="40%">Agent</Column>
<Column Width="40%">Name</Column>
@ -35,7 +35,7 @@
</Cell>
<Cell>
<p class="fw-semibold">@configuration.InstanceName</p>
<small class="font-monospace text-uppercase">@configuration.InstanceGuid.ToString()</small>
<small class="font-monospace text-uppercase">@instance.InstanceGuid.ToString()</small>
</Cell>
<Cell>
<InstanceStatusText Status="instance.Status" />
@ -51,7 +51,7 @@
<p class="font-monospace">@configuration.MemoryAllocation.InMegabytes.ToString() MB</p>
</Cell>
<Cell>
<a href="instances/@configuration.InstanceGuid.ToString()" class="btn btn-info btn-sm">Detail</a>
<a href="instances/@instance.InstanceGuid.ToString()" class="btn btn-info btn-sm">Detail</a>
</Cell>
</ItemRow>
<NoItemsRow>
@ -66,7 +66,7 @@
protected override void OnInitialized() {
AgentManager.AgentsChanged.Subscribe(this, agents => {
this.agentNamesByGuid = agents.Select(static agent => agent.Configuration).ToImmutableDictionary(static agent => agent.AgentGuid, static agent => agent.AgentName);
this.agentNamesByGuid = agents.ToImmutableDictionary(static agent => agent.AgentGuid, static agent => agent.Configuration.AgentName);
InvokeAsync(StateHasChanged);
});

View File

@ -8,9 +8,9 @@
@using Phantom.Common.Data.Web.Minecraft
@using Phantom.Common.Data.Web.Users
@using Phantom.Common.Messages.Web.ToController
@using Phantom.Common.Data.Instance
@using Phantom.Common.Data.Java
@using Phantom.Common.Data
@using Phantom.Common.Data.Instance
@using Phantom.Web.Services
@using Phantom.Web.Services.Agents
@using Phantom.Web.Services.Instances
@ -28,7 +28,7 @@
@{
static RenderFragment GetAgentOption(Agent agent) {
var configuration = agent.Configuration;
return @<option value="@configuration.AgentGuid">
return @<option value="@agent.AgentGuid">
@configuration.AgentName
&bullet;
@(agent.Stats?.RunningInstanceCount.ToString() ?? "?")/@(configuration.MaxInstances) @(configuration.MaxInstances == 1 ? "Instance" : "Instances")
@ -37,7 +37,7 @@
</option>;
}
}
@if (EditedInstanceConfiguration == null) {
@if (EditedInstance == null) {
<FormSelectInput Id="instance-agent" Label="Agent" @bind-Value="form.SelectedAgentGuid">
<option value="" selected>Select which agent will run the instance...</option>
@foreach (var agent in allAgentsByGuid.Values.Where(static agent => agent.ConnectionStatus is AgentIsOnline).OrderBy(static agent => agent.Configuration.AgentName)) {
@ -159,14 +159,14 @@
</div>
</div>
<FormButtonSubmit Label="@(EditedInstanceConfiguration == null ? "Create Instance" : "Edit Instance")" class="btn btn-primary" disabled="@(!IsSubmittable)" />
<FormButtonSubmit Label="@(EditedInstance == null ? "Create Instance" : "Edit Instance")" class="btn btn-primary" disabled="@(!IsSubmittable)" />
<FormSubmitError />
</Form>
@code {
[Parameter, EditorRequired]
public InstanceConfiguration? EditedInstanceConfiguration { get; init; }
public Instance? EditedInstance { get; init; }
private ConfigureInstanceFormModel form = null!;
@ -271,7 +271,7 @@
}
protected override void OnInitialized() {
form = new ConfigureInstanceFormModel(this, EditedInstanceConfiguration?.MemoryAllocation);
form = new ConfigureInstanceFormModel(this, EditedInstance?.Configuration.MemoryAllocation);
}
protected override async Task OnInitializedAsync() {
@ -282,18 +282,19 @@
allAgentJavaRuntimes = await agentJavaRuntimesTask;
allMinecraftVersions = await minecraftVersionsTask;
if (EditedInstanceConfiguration != null) {
form.SelectedAgentGuid = EditedInstanceConfiguration.AgentGuid;
form.InstanceName = EditedInstanceConfiguration.InstanceName;
form.ServerPort = EditedInstanceConfiguration.ServerPort;
form.RconPort = EditedInstanceConfiguration.RconPort;
form.MinecraftVersion = EditedInstanceConfiguration.MinecraftVersion;
form.MinecraftServerKind = EditedInstanceConfiguration.MinecraftServerKind;
form.MemoryUnits = EditedInstanceConfiguration.MemoryAllocation.RawValue;
form.JavaRuntimeGuid = EditedInstanceConfiguration.JavaRuntimeGuid;
form.JvmArguments = JvmArgumentsHelper.Join(EditedInstanceConfiguration.JvmArguments);
if (EditedInstance != null) {
var configuration = EditedInstance.Configuration;
form.SelectedAgentGuid = configuration.AgentGuid;
form.InstanceName = configuration.InstanceName;
form.ServerPort = configuration.ServerPort;
form.RconPort = configuration.RconPort;
form.MinecraftVersion = configuration.MinecraftVersion;
form.MinecraftServerKind = configuration.MinecraftServerKind;
form.MemoryUnits = configuration.MemoryAllocation.RawValue;
form.JavaRuntimeGuid = configuration.JavaRuntimeGuid;
form.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
minecraftVersionType = allMinecraftVersions.FirstOrDefault(version => version.Id == EditedInstanceConfiguration.MinecraftVersion)?.Type ?? minecraftVersionType;
minecraftVersionType = allMinecraftVersions.FirstOrDefault(version => version.Id == configuration.MinecraftVersion)?.Type ?? minecraftVersionType;
}
form.EditContext.RevalidateWhenFieldChanges(tracked: nameof(ConfigureInstanceFormModel.SelectedAgentGuid), revalidated: nameof(ConfigureInstanceFormModel.MemoryUnits));
@ -328,9 +329,9 @@
return;
}
var instance = new InstanceConfiguration(
EditedInstanceConfiguration?.AgentGuid ?? selectedAgent.Configuration.AgentGuid,
EditedInstanceConfiguration?.InstanceGuid ?? Guid.NewGuid(),
var instanceGuid = EditedInstance?.InstanceGuid ?? Guid.NewGuid();
var instanceConfiguration = new InstanceConfiguration(
EditedInstance?.Configuration.AgentGuid ?? selectedAgent.AgentGuid,
form.InstanceName,
(ushort) form.ServerPort,
(ushort) form.RconPort,
@ -341,9 +342,9 @@
JvmArgumentsHelper.Split(form.JvmArguments)
);
var result = await InstanceManager.CreateOrUpdateInstance(loggedInUserGuid.Value, instance, CancellationToken);
var result = await InstanceManager.CreateOrUpdateInstance(loggedInUserGuid.Value, instanceGuid, instanceConfiguration, CancellationToken);
if (result.Is(CreateOrUpdateInstanceResult.Success)) {
await Navigation.NavigateTo("instances/" + instance.InstanceGuid);
await Navigation.NavigateTo("instances/" + instanceGuid);
}
else {
form.SubmitModel.StopSubmitting(result.ToSentence(CreateOrUpdateInstanceResultExtensions.ToSentence));