mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2024-11-25 07:42:58 +01:00
Compare commits
2 Commits
d241ed9f2f
...
ebc2db9c49
Author | SHA1 | Date | |
---|---|---|---|
ebc2db9c49 | |||
71acce3123 |
@ -21,9 +21,11 @@ sealed class KeepAliveLoop {
|
|||||||
|
|
||||||
private async Task Run() {
|
private async Task Run() {
|
||||||
var cancellationToken = cancellationTokenSource.Token;
|
var cancellationToken = cancellationTokenSource.Token;
|
||||||
|
|
||||||
Logger.Information("Started keep-alive loop.");
|
|
||||||
try {
|
try {
|
||||||
|
await connection.IsReady.WaitAsync(cancellationToken);
|
||||||
|
Logger.Information("Started keep-alive loop.");
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
await Task.Delay(KeepAliveInterval, cancellationToken);
|
await Task.Delay(KeepAliveInterval, cancellationToken);
|
||||||
await connection.Send(new AgentIsAliveMessage()).WaitAsync(cancellationToken);
|
await connection.Send(new AgentIsAliveMessage()).WaitAsync(cancellationToken);
|
||||||
|
@ -40,6 +40,8 @@ public sealed class MessageListener : IMessageToAgentListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connection.SetIsReady();
|
||||||
|
|
||||||
await connection.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
|
await connection.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
|
||||||
await agent.InstanceSessionManager.RefreshAgentStatus();
|
await agent.InstanceSessionManager.RefreshAgentStatus();
|
||||||
|
|
||||||
|
@ -1,25 +1,21 @@
|
|||||||
using System.Collections.Immutable;
|
using System.Collections.Immutable;
|
||||||
using System.Diagnostics.CodeAnalysis;
|
|
||||||
using Akka.Actor;
|
using Akka.Actor;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Phantom.Common.Data;
|
using Phantom.Common.Data;
|
||||||
using Phantom.Common.Data.Agent;
|
using Phantom.Common.Data.Agent;
|
||||||
using Phantom.Common.Data.Instance;
|
using Phantom.Common.Data.Instance;
|
||||||
using Phantom.Common.Data.Java;
|
using Phantom.Common.Data.Java;
|
||||||
using Phantom.Common.Data.Minecraft;
|
|
||||||
using Phantom.Common.Data.Replies;
|
|
||||||
using Phantom.Common.Data.Web.Agent;
|
using Phantom.Common.Data.Web.Agent;
|
||||||
using Phantom.Common.Data.Web.Instance;
|
using Phantom.Common.Data.Web.Instance;
|
||||||
using Phantom.Common.Data.Web.Minecraft;
|
using Phantom.Common.Data.Web.Minecraft;
|
||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
|
using Phantom.Common.Messages.Agent.ToAgent;
|
||||||
using Phantom.Controller.Database;
|
using Phantom.Controller.Database;
|
||||||
using Phantom.Controller.Minecraft;
|
using Phantom.Controller.Minecraft;
|
||||||
using Phantom.Controller.Services.Instances;
|
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
using Phantom.Utils.Actor.Mailbox;
|
using Phantom.Utils.Actor.Mailbox;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Rpc.Runtime;
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Agents;
|
namespace Phantom.Controller.Services.Agents;
|
||||||
@ -30,7 +26,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
|||||||
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5);
|
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5);
|
||||||
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
|
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
|
||||||
|
|
||||||
public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, TaskManager TaskManager, CancellationToken CancellationToken);
|
public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
||||||
|
|
||||||
public static Props<ICommand> Factory(Init init) {
|
public static Props<ICommand> Factory(Init init) {
|
||||||
return Props<ICommand>.Create(() => new AgentActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
|
return Props<ICommand>.Create(() => new AgentActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
|
||||||
@ -39,7 +35,6 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
|||||||
private readonly ControllerState controllerState;
|
private readonly ControllerState controllerState;
|
||||||
private readonly MinecraftVersions minecraftVersions;
|
private readonly MinecraftVersions minecraftVersions;
|
||||||
private readonly IDbContextProvider dbProvider;
|
private readonly IDbContextProvider dbProvider;
|
||||||
private readonly TaskManager taskManager;
|
|
||||||
private readonly CancellationToken cancellationToken;
|
private readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
private AgentConfiguration configuration;
|
private AgentConfiguration configuration;
|
||||||
@ -66,36 +61,32 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private readonly ActorRef<AgentDatabaseStorageActor.ICommand> databaseStorageActor;
|
private readonly ActorRef<AgentDatabaseStorageActor.ICommand> databaseStorageActor;
|
||||||
|
private readonly ActorRef<AgentInstanceRouterActor.ICommand> instanceRouterActor;
|
||||||
|
|
||||||
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
|
|
||||||
private readonly Dictionary<Guid, Instance> instanceDataByGuid = new ();
|
private readonly Dictionary<Guid, Instance> instanceDataByGuid = new ();
|
||||||
|
|
||||||
private AgentActor(Init init) {
|
private AgentActor(Init init) {
|
||||||
this.controllerState = init.ControllerState;
|
this.controllerState = init.ControllerState;
|
||||||
this.minecraftVersions = init.MinecraftVersions;
|
this.minecraftVersions = init.MinecraftVersions;
|
||||||
this.dbProvider = init.DbProvider;
|
this.dbProvider = init.DbProvider;
|
||||||
this.taskManager = init.TaskManager;
|
|
||||||
this.cancellationToken = init.CancellationToken;
|
this.cancellationToken = init.CancellationToken;
|
||||||
|
|
||||||
this.configuration = init.Configuration;
|
this.configuration = init.Configuration;
|
||||||
this.connection = new AgentConnection(configuration.AgentGuid, configuration.AgentName);
|
this.connection = new AgentConnection(configuration.AgentGuid, configuration.AgentName);
|
||||||
|
|
||||||
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(configuration.AgentGuid, dbProvider, cancellationToken)), "DatabaseStorage");
|
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(configuration.AgentGuid, dbProvider, cancellationToken)), "DatabaseStorage");
|
||||||
|
this.instanceRouterActor = Context.ActorOf(AgentInstanceRouterActor.Factory(new AgentInstanceRouterActor.Init(SelfTyped, connection, minecraftVersions, dbProvider, cancellationToken)), "InstanceRouter");
|
||||||
|
|
||||||
NotifyAgentUpdated();
|
NotifyAgentUpdated();
|
||||||
|
|
||||||
ReceiveAsync<InitializeCommand>(Initialize);
|
ReceiveAsync<InitializeCommand>(Initialize);
|
||||||
ReceiveAndReply<RegisterCommand, ImmutableArray<Instance>>(Register);
|
ReceiveAsyncAndReply<RegisterCommand, ImmutableArray<ConfigureInstanceMessage>>(Register);
|
||||||
Receive<UnregisterCommand>(Unregister);
|
Receive<UnregisterCommand>(Unregister);
|
||||||
Receive<RefreshConnectionStatusCommand>(RefreshConnectionStatus);
|
Receive<RefreshConnectionStatusCommand>(RefreshConnectionStatus);
|
||||||
Receive<NotifyIsAliveCommand>(NotifyIsAlive);
|
Receive<NotifyIsAliveCommand>(NotifyIsAlive);
|
||||||
Receive<UpdateStatsCommand>(UpdateStats);
|
Receive<UpdateStatsCommand>(UpdateStats);
|
||||||
Receive<UpdateJavaRuntimesCommand>(UpdateJavaRuntimes);
|
Receive<UpdateJavaRuntimesCommand>(UpdateJavaRuntimes);
|
||||||
ReceiveAsyncAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
|
Receive<RouteToInstanceCommand>(RouteToInstance);
|
||||||
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
|
|
||||||
Receive<LaunchInstanceCommand>(LaunchInstance);
|
|
||||||
Receive<StopInstanceCommand>(StopInstance);
|
|
||||||
Receive<SendMinecraftCommandCommand>(SendMinecraftCommand);
|
|
||||||
Receive<ReceiveInstanceDataCommand>(ReceiveInstanceData);
|
Receive<ReceiveInstanceDataCommand>(ReceiveInstanceData);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,12 +100,9 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
|||||||
Context.System.Scheduler.ScheduleTellRepeatedly(DisconnectionRecheckInterval, DisconnectionRecheckInterval, Self, new RefreshConnectionStatusCommand(), Self);
|
Context.System.Scheduler.ScheduleTellRepeatedly(DisconnectionRecheckInterval, DisconnectionRecheckInterval, Self, new RefreshConnectionStatusCommand(), Self);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
|
private void CreateNewInstance(Instance instance) {
|
||||||
UpdateInstanceData(instance);
|
UpdateInstanceData(instance);
|
||||||
|
instanceRouterActor.Tell(new AgentInstanceRouterActor.InitializeInstanceCommand(instance));
|
||||||
var instanceActor = CreateInstanceActor(instance);
|
|
||||||
instanceActorByGuid.Add(instance.Configuration.InstanceGuid, instanceActor);
|
|
||||||
return instanceActor;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void UpdateInstanceData(Instance instance) {
|
private void UpdateInstanceData(Instance instance) {
|
||||||
@ -122,26 +110,22 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
|||||||
controllerState.UpdateInstance(instance);
|
controllerState.UpdateInstance(instance);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
|
private async Task<ImmutableArray<ConfigureInstanceMessage>> PrepareInitialConfigurationMessages() {
|
||||||
var init = new InstanceActor.Init(instance, SelfTyped, connection, dbProvider, cancellationToken);
|
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
|
||||||
var name = "Instance:" + instance.Configuration.InstanceGuid;
|
|
||||||
return Context.ActorOf(InstanceActor.Factory(init), name);
|
foreach (var (instanceConfiguration, _, launchAutomatically) in instanceDataByGuid.Values.ToImmutableArray()) {
|
||||||
}
|
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
|
||||||
|
configurationMessages.Add(new ConfigureInstanceMessage(instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
|
||||||
|
}
|
||||||
|
|
||||||
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
|
return configurationMessages.ToImmutable();
|
||||||
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
|
|
||||||
instance.Forward(command);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface ICommand {}
|
public interface ICommand {}
|
||||||
|
|
||||||
private sealed record InitializeCommand : ICommand;
|
private sealed record InitializeCommand : ICommand;
|
||||||
|
|
||||||
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<Instance>>;
|
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
|
||||||
|
|
||||||
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand;
|
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand;
|
||||||
|
|
||||||
@ -153,15 +137,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
|||||||
|
|
||||||
public sealed record UpdateJavaRuntimesCommand(ImmutableArray<TaggedJavaRuntime> JavaRuntimes) : ICommand;
|
public sealed record UpdateJavaRuntimesCommand(ImmutableArray<TaggedJavaRuntime> JavaRuntimes) : ICommand;
|
||||||
|
|
||||||
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
|
public sealed record RouteToInstanceCommand(AgentInstanceRouterActor.ICommand Command) : ICommand;
|
||||||
|
|
||||||
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
|
|
||||||
|
|
||||||
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
|
|
||||||
|
|
||||||
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
|
|
||||||
|
|
||||||
public sealed record SendMinecraftCommandCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
|
|
||||||
|
|
||||||
public sealed record ReceiveInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead;
|
public sealed record ReceiveInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead;
|
||||||
|
|
||||||
@ -185,19 +161,21 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ImmutableArray<Instance> Register(RegisterCommand command) {
|
private async Task<ImmutableArray<ConfigureInstanceMessage>> Register(RegisterCommand command) {
|
||||||
|
var configurationMessages = await PrepareInitialConfigurationMessages();
|
||||||
|
|
||||||
configuration = command.Configuration;
|
configuration = command.Configuration;
|
||||||
connection.UpdateConnection(command.Connection, configuration.AgentName);
|
connection.UpdateConnection(command.Connection, configuration.AgentName);
|
||||||
|
|
||||||
lastPingTime = DateTimeOffset.Now;
|
lastPingTime = DateTimeOffset.Now;
|
||||||
isOnline = true;
|
isOnline = true;
|
||||||
|
|
||||||
NotifyAgentUpdated();
|
NotifyAgentUpdated();
|
||||||
|
|
||||||
|
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
|
||||||
|
|
||||||
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentCommand(configuration.AgentName, configuration.ProtocolVersion, configuration.BuildVersion, configuration.MaxInstances, configuration.MaxMemory));
|
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentCommand(configuration.AgentName, configuration.ProtocolVersion, configuration.BuildVersion, configuration.MaxInstances, configuration.MaxMemory));
|
||||||
|
|
||||||
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
|
return configurationMessages;
|
||||||
return instanceDataByGuid.Values.ToImmutableArray();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void Unregister(UnregisterCommand command) {
|
private void Unregister(UnregisterCommand command) {
|
||||||
@ -207,11 +185,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
|||||||
isOnline = false;
|
isOnline = false;
|
||||||
NotifyAgentUpdated();
|
NotifyAgentUpdated();
|
||||||
|
|
||||||
var setStatusCommand = new InstanceActor.SetStatusCommand(InstanceStatus.Offline);
|
instanceRouterActor.Tell(new AgentInstanceRouterActor.MarkInstancesAsOfflineCommand());
|
||||||
|
|
||||||
foreach (var instance in instanceActorByGuid.Values) {
|
|
||||||
instance.Tell(setStatusCommand);
|
|
||||||
}
|
|
||||||
|
|
||||||
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
|
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
|
||||||
}
|
}
|
||||||
@ -244,83 +218,11 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
|||||||
javaRuntimes = command.JavaRuntimes;
|
javaRuntimes = command.JavaRuntimes;
|
||||||
controllerState.UpdateAgentJavaRuntimes(configuration.AgentGuid, javaRuntimes);
|
controllerState.UpdateAgentJavaRuntimes(configuration.AgentGuid, javaRuntimes);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command, TaskCompletionSource<InstanceActionResult<CreateOrUpdateInstanceResult>> result2) {
|
|
||||||
var instanceConfiguration = command.Configuration;
|
|
||||||
var instanceName = instanceConfiguration.InstanceName;
|
|
||||||
var instanceGuid = instanceConfiguration.InstanceGuid;
|
|
||||||
|
|
||||||
if (string.IsNullOrWhiteSpace(instanceName)) {
|
|
||||||
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
|
|
||||||
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
|
|
||||||
if (serverExecutableInfo == null) {
|
|
||||||
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool isNewInstance = !instanceActorByGuid.TryGetValue(instanceGuid, out var instanceActorRef);
|
|
||||||
if (isNewInstance) {
|
|
||||||
instanceActorRef = CreateNewInstance(Instance.Offline(instanceConfiguration));
|
|
||||||
}
|
|
||||||
|
|
||||||
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isNewInstance);
|
|
||||||
taskManager.Run("Configure instance " + instanceGuid, async () => await ConfigureInstance(instanceActorRef, configureInstanceCommand, configuration.AgentName, result2, cancellationToken));
|
|
||||||
}
|
|
||||||
|
|
||||||
[SuppressMessage("ReSharper", "ConvertIfStatementToConditionalTernaryExpression")]
|
private void RouteToInstance(RouteToInstanceCommand command) {
|
||||||
private static async Task ConfigureInstance(ActorRef<InstanceActor.ICommand> instanceActorRef, InstanceActor.ConfigureInstanceCommand command, string agentName, TaskCompletionSource<InstanceActionResult<CreateOrUpdateInstanceResult>> result2, CancellationToken cancellationToken) {
|
instanceRouterActor.Forward(command.Command);
|
||||||
var instanceName = command.Configuration.InstanceName;
|
|
||||||
var instanceGuid = command.Configuration.InstanceGuid;
|
|
||||||
|
|
||||||
var result = await instanceActorRef.Request(command, cancellationToken);
|
|
||||||
|
|
||||||
if (result.Is(ConfigureInstanceResult.Success)) {
|
|
||||||
if (command.IsNewInstance) {
|
|
||||||
Logger.Information("Added instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\".", instanceName, instanceGuid, agentName);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
Logger.Information("Edited instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\".", instanceName, instanceGuid, agentName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
if (command.IsNewInstance) {
|
|
||||||
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, agentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
Logger.Information("Failed editing instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, agentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
result2.SetResult(result.Map(static result => result switch {
|
|
||||||
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
|
|
||||||
_ => CreateOrUpdateInstanceResult.UnknownError
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
|
|
||||||
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void LaunchInstance(LaunchInstanceCommand command) {
|
|
||||||
TellInstance(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void StopInstance(StopInstanceCommand command) {
|
|
||||||
TellInstance(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void SendMinecraftCommand(SendMinecraftCommandCommand command) {
|
|
||||||
TellInstance(command.InstanceGuid, new InstanceActor.SendMinecraftCommandCommand(command.AuditLogUserGuid, command.Command));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void ReceiveInstanceData(ReceiveInstanceDataCommand command) {
|
private void ReceiveInstanceData(ReceiveInstanceDataCommand command) {
|
||||||
UpdateInstanceData(command.Instance);
|
UpdateInstanceData(command.Instance);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,188 @@
|
|||||||
|
using Phantom.Common.Data;
|
||||||
|
using Phantom.Common.Data.Instance;
|
||||||
|
using Phantom.Common.Data.Minecraft;
|
||||||
|
using Phantom.Common.Data.Replies;
|
||||||
|
using Phantom.Common.Data.Web.Instance;
|
||||||
|
using Phantom.Controller.Database;
|
||||||
|
using Phantom.Controller.Minecraft;
|
||||||
|
using Phantom.Controller.Services.Instances;
|
||||||
|
using Phantom.Utils.Actor;
|
||||||
|
using Phantom.Utils.Actor.Mailbox;
|
||||||
|
using Phantom.Utils.Actor.Tasks;
|
||||||
|
using Phantom.Utils.Logging;
|
||||||
|
using Serilog;
|
||||||
|
|
||||||
|
namespace Phantom.Controller.Services.Agents;
|
||||||
|
|
||||||
|
sealed class AgentInstanceRouterActor : ReceiveActor<AgentInstanceRouterActor.ICommand> {
|
||||||
|
private static readonly ILogger Logger = PhantomLogger.Create<AgentInstanceRouterActor>();
|
||||||
|
|
||||||
|
public readonly record struct Init(ActorRef<AgentActor.ICommand> AgentActorRef, AgentConnection AgentConnection, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
|
||||||
|
|
||||||
|
public static Props<ICommand> Factory(Init init) {
|
||||||
|
return Props<ICommand>.Create(() => new AgentInstanceRouterActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
|
||||||
|
}
|
||||||
|
|
||||||
|
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
|
||||||
|
private readonly AgentConnection agentConnection;
|
||||||
|
private readonly MinecraftVersions minecraftVersions;
|
||||||
|
private readonly IDbContextProvider dbProvider;
|
||||||
|
private readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
|
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
|
||||||
|
|
||||||
|
private AgentInstanceRouterActor(Init init) {
|
||||||
|
this.agentActorRef = init.AgentActorRef;
|
||||||
|
this.agentConnection = init.AgentConnection;
|
||||||
|
this.minecraftVersions = init.MinecraftVersions;
|
||||||
|
this.dbProvider = init.DbProvider;
|
||||||
|
this.cancellationToken = init.CancellationToken;
|
||||||
|
|
||||||
|
Receive<InitializeInstanceCommand>(InitializeInstance);
|
||||||
|
Receive<MarkInstancesAsOfflineCommand>(MarkInstancesAsOffline);
|
||||||
|
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
|
||||||
|
ReceiveAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
|
||||||
|
ReceiveAndReplyLater<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
|
||||||
|
ReceiveAndReplyLater<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
|
||||||
|
ReceiveAndReplyLater<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
|
||||||
|
var instanceGuid = instance.Configuration.InstanceGuid;
|
||||||
|
|
||||||
|
if (instanceActorByGuid.ContainsKey(instanceGuid)) {
|
||||||
|
throw new InvalidOperationException("Instance already exists: " + instanceGuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
var instanceActor = CreateInstanceActor(instance);
|
||||||
|
instanceActorByGuid.Add(instanceGuid, instanceActor);
|
||||||
|
return instanceActor;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
|
||||||
|
var init = new InstanceActor.Init(instance, agentActorRef, agentConnection, dbProvider, cancellationToken);
|
||||||
|
var name = "Instance:" + instance.Configuration.InstanceGuid;
|
||||||
|
return Context.ActorOf(InstanceActor.Factory(init), name);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
|
||||||
|
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
|
||||||
|
instance.Tell(command);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void TellAllInstances(InstanceActor.ICommand command) {
|
||||||
|
foreach (var instance in instanceActorByGuid.Values) {
|
||||||
|
instance.Tell(command);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task<InstanceActionResult<TReply>> RequestInstance<TCommand, TReply>(Guid instanceGuid, TCommand command) where TCommand : InstanceActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
|
||||||
|
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
|
||||||
|
return instance.Request(command, cancellationToken);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
|
||||||
|
return Task.FromResult(InstanceActionResult.General<TReply>(InstanceActionGeneralResult.InstanceDoesNotExist));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public interface ICommand {}
|
||||||
|
|
||||||
|
public sealed record InitializeInstanceCommand(Instance Instance) : ICommand;
|
||||||
|
|
||||||
|
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
|
||||||
|
|
||||||
|
public sealed record MarkInstancesAsOfflineCommand : ICommand;
|
||||||
|
|
||||||
|
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
|
||||||
|
|
||||||
|
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
|
||||||
|
|
||||||
|
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
|
||||||
|
|
||||||
|
public sealed record SendCommandToInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
|
||||||
|
|
||||||
|
private void InitializeInstance(InitializeInstanceCommand command) {
|
||||||
|
CreateNewInstance(command.Instance);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void MarkInstancesAsOffline(MarkInstancesAsOfflineCommand command) {
|
||||||
|
TellAllInstances(new InstanceActor.SetStatusCommand(InstanceStatus.Offline));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
|
||||||
|
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command) {
|
||||||
|
var instanceConfiguration = command.Configuration;
|
||||||
|
|
||||||
|
if (string.IsNullOrWhiteSpace(instanceConfiguration.InstanceName)) {
|
||||||
|
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
|
||||||
|
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
|
||||||
|
}
|
||||||
|
|
||||||
|
return minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken)
|
||||||
|
.ContinueOnActor(CreateOrUpdateInstance1, command)
|
||||||
|
.Unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance1(FileDownloadInfo? serverExecutableInfo, CreateOrUpdateInstanceCommand command) {
|
||||||
|
if (serverExecutableInfo == null) {
|
||||||
|
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
|
||||||
|
}
|
||||||
|
|
||||||
|
var instanceConfiguration = command.Configuration;
|
||||||
|
|
||||||
|
bool isCreatingInstance = !instanceActorByGuid.TryGetValue(instanceConfiguration.InstanceGuid, out var instanceActorRef);
|
||||||
|
if (isCreatingInstance) {
|
||||||
|
instanceActorRef = CreateNewInstance(Instance.Offline(instanceConfiguration));
|
||||||
|
}
|
||||||
|
|
||||||
|
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isCreatingInstance);
|
||||||
|
|
||||||
|
return instanceActorRef.Request(configureInstanceCommand, cancellationToken)
|
||||||
|
.ContinueOnActor(CreateOrUpdateInstance2, configureInstanceCommand);
|
||||||
|
}
|
||||||
|
|
||||||
|
private InstanceActionResult<CreateOrUpdateInstanceResult> CreateOrUpdateInstance2(InstanceActionResult<ConfigureInstanceResult> result, InstanceActor.ConfigureInstanceCommand command) {
|
||||||
|
var instanceName = command.Configuration.InstanceName;
|
||||||
|
var instanceGuid = command.Configuration.InstanceGuid;
|
||||||
|
var isCreating = command.IsCreatingInstance;
|
||||||
|
|
||||||
|
if (result.Is(ConfigureInstanceResult.Success)) {
|
||||||
|
string action = isCreating ? "Added" : "Edited";
|
||||||
|
string relation = isCreating ? "to agent" : "in agent";
|
||||||
|
Logger.Information(action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\".", instanceName, instanceGuid, configuration.AgentName);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
string action = isCreating ? "adding" : "editing";
|
||||||
|
string relation = isCreating ? "to agent" : "in agent";
|
||||||
|
Logger.Information("Failed " + action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, configuration.AgentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.Map(static result => result switch {
|
||||||
|
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
|
||||||
|
_ => CreateOrUpdateInstanceResult.UnknownError
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
|
||||||
|
return RequestInstance<InstanceActor.LaunchInstanceCommand, LaunchInstanceResult>(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
|
||||||
|
return RequestInstance<InstanceActor.StopInstanceCommand, StopInstanceResult>(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
|
||||||
|
return RequestInstance<InstanceActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(command.InstanceGuid, new InstanceActor.SendCommandToInstanceCommand(command.AuditLogUserGuid, command.Command));
|
||||||
|
}
|
||||||
|
}
|
@ -1,12 +1,9 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Collections.Immutable;
|
|
||||||
using Akka.Actor;
|
using Akka.Actor;
|
||||||
using Phantom.Common.Data;
|
using Phantom.Common.Data;
|
||||||
using Phantom.Common.Data.Agent;
|
using Phantom.Common.Data.Agent;
|
||||||
using Phantom.Common.Data.Instance;
|
|
||||||
using Phantom.Common.Data.Replies;
|
using Phantom.Common.Data.Replies;
|
||||||
using Phantom.Common.Data.Web.Agent;
|
using Phantom.Common.Data.Web.Agent;
|
||||||
using Phantom.Common.Data.Web.Instance;
|
|
||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
using Phantom.Common.Messages.Agent.ToAgent;
|
using Phantom.Common.Messages.Agent.ToAgent;
|
||||||
using Phantom.Controller.Database;
|
using Phantom.Controller.Database;
|
||||||
@ -68,23 +65,12 @@ sealed class AgentManager {
|
|||||||
|
|
||||||
var agentProperties = AgentConfiguration.From(agentInfo);
|
var agentProperties = AgentConfiguration.From(agentInfo);
|
||||||
var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
|
var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
|
||||||
var agentInstances = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
|
var configureInstanceMessages = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
|
||||||
await connection.Send(new RegisterAgentSuccessMessage(await GetInstanceConfigurationsForAgent(agentInstances)));
|
await connection.Send(new RegisterAgentSuccessMessage(configureInstanceMessages));
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<ImmutableArray<ConfigureInstanceMessage>> GetInstanceConfigurationsForAgent(ImmutableArray<Instance> instances) {
|
|
||||||
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
|
|
||||||
|
|
||||||
foreach (var (configuration, _, launchAutomatically) in instances) {
|
|
||||||
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(configuration.MinecraftVersion, cancellationToken);
|
|
||||||
configurationMessages.Add(new ConfigureInstanceMessage(configuration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
|
|
||||||
}
|
|
||||||
|
|
||||||
return configurationMessages.ToImmutable();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) {
|
public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) {
|
||||||
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
|
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
|
||||||
agent.Tell(command);
|
agent.Tell(command);
|
||||||
@ -96,9 +82,9 @@ sealed class AgentManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
|
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentInstanceRouterActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
|
||||||
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
|
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
|
||||||
return await agent.Request(command, cancellationToken);
|
return await agent.Request(new AgentActor.RouteToInstanceCommand(command), cancellationToken);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return InstanceActionResult.General<TReply>(InstanceActionGeneralResult.AgentDoesNotExist);
|
return InstanceActionResult.General<TReply>(InstanceActionGeneralResult.AgentDoesNotExist);
|
||||||
|
@ -46,7 +46,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
|||||||
ReceiveAsyncAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance);
|
ReceiveAsyncAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance);
|
||||||
ReceiveAsyncAndReply<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
|
ReceiveAsyncAndReply<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
|
||||||
ReceiveAsyncAndReply<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
|
ReceiveAsyncAndReply<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
|
||||||
ReceiveAsyncAndReply<SendMinecraftCommandCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
|
ReceiveAsyncAndReply<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void NotifyInstanceUpdated() {
|
private void NotifyInstanceUpdated() {
|
||||||
@ -63,13 +63,13 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
|||||||
|
|
||||||
public sealed record SetStatusCommand(IInstanceStatus Status) : ICommand;
|
public sealed record SetStatusCommand(IInstanceStatus Status) : ICommand;
|
||||||
|
|
||||||
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsNewInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
|
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsCreatingInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
|
||||||
|
|
||||||
public sealed record LaunchInstanceCommand(Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
|
public sealed record LaunchInstanceCommand(Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
|
||||||
|
|
||||||
public sealed record StopInstanceCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
|
public sealed record StopInstanceCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
|
||||||
|
|
||||||
public sealed record SendMinecraftCommandCommand(Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
|
public sealed record SendCommandToInstanceCommand(Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
|
||||||
|
|
||||||
private void SetStatus(SetStatusCommand command) {
|
private void SetStatus(SetStatusCommand command) {
|
||||||
status = command.Status;
|
status = command.Status;
|
||||||
@ -98,11 +98,11 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
|||||||
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
|
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
|
||||||
|
|
||||||
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
|
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
|
||||||
if (command.IsNewInstance) {
|
if (command.IsCreatingInstance) {
|
||||||
auditLogWriter.InstanceEdited(configuration.InstanceGuid);
|
auditLogWriter.InstanceCreated(configuration.InstanceGuid);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
auditLogWriter.InstanceCreated(configuration.InstanceGuid);
|
auditLogWriter.InstanceEdited(configuration.InstanceGuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
await db.Ctx.SaveChangesAsync(cancellationToken);
|
await db.Ctx.SaveChangesAsync(cancellationToken);
|
||||||
@ -148,7 +148,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendMinecraftCommandCommand command) {
|
private async Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
|
||||||
var message = new SendCommandToInstanceMessage(InstanceGuid, command.Command);
|
var message = new SendCommandToInstanceMessage(InstanceGuid, command.Command);
|
||||||
var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(message);
|
var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(message);
|
||||||
|
|
||||||
|
@ -30,7 +30,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
|
|||||||
private static readonly ILogger Logger = PhantomLogger.Create<WebMessageListener>();
|
private static readonly ILogger Logger = PhantomLogger.Create<WebMessageListener>();
|
||||||
|
|
||||||
private static int listenerSequenceId = 0;
|
private static int listenerSequenceId = 0;
|
||||||
|
|
||||||
private readonly ActorRef<ICommand> actor;
|
private readonly ActorRef<ICommand> actor;
|
||||||
private readonly RpcConnectionToClient<IMessageToWebListener> connection;
|
private readonly RpcConnectionToClient<IMessageToWebListener> connection;
|
||||||
private readonly AuthToken authToken;
|
private readonly AuthToken authToken;
|
||||||
@ -81,30 +81,30 @@ public sealed class WebMessageListener : IMessageToControllerListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private readonly WebMessageListener listener;
|
private readonly WebMessageListener listener;
|
||||||
|
|
||||||
private Actor(WebMessageListener listener) {
|
private Actor(WebMessageListener listener) {
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
|
|
||||||
Receive<StartConnectionCommand>(StartConnection);
|
Receive<StartConnectionCommand>(StartConnection);
|
||||||
Receive<StopConnectionCommand>(StopConnection);
|
Receive<StopConnectionCommand>(StopConnection);
|
||||||
Receive<RefreshAgentsCommand>(RefreshAgents);
|
Receive<RefreshAgentsCommand>(RefreshAgents);
|
||||||
Receive<RefreshInstancesCommand>(RefreshInstances);
|
Receive<RefreshInstancesCommand>(RefreshInstances);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void StartConnection(StartConnectionCommand command) {
|
private void StartConnection(StartConnectionCommand command) {
|
||||||
listener.controllerState.AgentsByGuidReceiver.Register(SelfTyped, static state => new RefreshAgentsCommand(state));
|
listener.controllerState.AgentsByGuidReceiver.Register(SelfTyped, static state => new RefreshAgentsCommand(state));
|
||||||
listener.controllerState.InstancesByGuidReceiver.Register(SelfTyped, static state => new RefreshInstancesCommand(state));
|
listener.controllerState.InstancesByGuidReceiver.Register(SelfTyped, static state => new RefreshInstancesCommand(state));
|
||||||
|
|
||||||
listener.instanceLogManager.LogsReceived += HandleInstanceLogsReceived;
|
listener.instanceLogManager.LogsReceived += HandleInstanceLogsReceived;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void StopConnection(StopConnectionCommand command) {
|
private void StopConnection(StopConnectionCommand command) {
|
||||||
listener.instanceLogManager.LogsReceived -= HandleInstanceLogsReceived;
|
listener.instanceLogManager.LogsReceived -= HandleInstanceLogsReceived;
|
||||||
|
|
||||||
listener.controllerState.AgentsByGuidReceiver.Unregister(SelfTyped);
|
listener.controllerState.AgentsByGuidReceiver.Unregister(SelfTyped);
|
||||||
listener.controllerState.InstancesByGuidReceiver.Unregister(SelfTyped);
|
listener.controllerState.InstancesByGuidReceiver.Unregister(SelfTyped);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void RefreshAgents(RefreshAgentsCommand command) {
|
private void RefreshAgents(RefreshAgentsCommand command) {
|
||||||
var message = new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray());
|
var message = new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray());
|
||||||
listener.connection.Send(message);
|
listener.connection.Send(message);
|
||||||
@ -114,7 +114,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
|
|||||||
var message = new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray());
|
var message = new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray());
|
||||||
listener.connection.Send(message);
|
listener.connection.Send(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleInstanceLogsReceived(object? sender, InstanceLogManager.Event e) {
|
private void HandleInstanceLogsReceived(object? sender, InstanceLogManager.Event e) {
|
||||||
listener.connection.Send(new InstanceOutputMessage(e.InstanceGuid, e.Lines));
|
listener.connection.Send(new InstanceOutputMessage(e.InstanceGuid, e.Lines));
|
||||||
}
|
}
|
||||||
@ -123,11 +123,11 @@ public sealed class WebMessageListener : IMessageToControllerListener {
|
|||||||
private interface ICommand {}
|
private interface ICommand {}
|
||||||
|
|
||||||
private sealed record StartConnectionCommand : ICommand;
|
private sealed record StartConnectionCommand : ICommand;
|
||||||
|
|
||||||
private sealed record StopConnectionCommand : ICommand;
|
private sealed record StopConnectionCommand : ICommand;
|
||||||
|
|
||||||
private sealed record RefreshAgentsCommand(ImmutableDictionary<Guid, Agent> Agents) : ICommand;
|
private sealed record RefreshAgentsCommand(ImmutableDictionary<Guid, Agent> Agents) : ICommand;
|
||||||
|
|
||||||
private sealed record RefreshInstancesCommand(ImmutableDictionary<Guid, Instance> Instances) : ICommand;
|
private sealed record RefreshInstancesCommand(ImmutableDictionary<Guid, Instance> Instances) : ICommand;
|
||||||
|
|
||||||
public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) {
|
public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) {
|
||||||
@ -199,7 +199,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
|
public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
|
||||||
return agentManager.DoInstanceAction<AgentActor.SendMinecraftCommandCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendMinecraftCommandCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
|
return agentManager.DoInstanceAction<AgentActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendCommandToInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {
|
public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {
|
||||||
|
@ -5,47 +5,42 @@ namespace Phantom.Utils.Actor;
|
|||||||
public abstract class ReceiveActor<TMessage> : ReceiveActor {
|
public abstract class ReceiveActor<TMessage> : ReceiveActor {
|
||||||
protected ActorRef<TMessage> SelfTyped => new (Self);
|
protected ActorRef<TMessage> SelfTyped => new (Self);
|
||||||
|
|
||||||
protected void ReceiveAndReply<TReplyableCommand, TReply>(Func<TReplyableCommand, TReply> action) where TReplyableCommand : TMessage, ICanReply<TReply> {
|
protected void ReceiveAndReply<TReplyableMessage, TReply>(Func<TReplyableMessage, TReply> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
|
||||||
Receive<TReplyableCommand>(message => HandleMessageWithReply(action, message));
|
Receive<TReplyableMessage>(message => HandleMessageWithReply(action, message));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void ReceiveAsyncAndReply<TReplyableCommand, TReply>(Func<TReplyableCommand, Task<TReply>> action) where TReplyableCommand : TMessage, ICanReply<TReply> {
|
protected void ReceiveAndReplyLater<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
|
||||||
ReceiveAsync<TReplyableCommand>(message => HandleMessageWithReplyAsync(action, message));
|
// Must be async to set default task scheduler to actor scheduler.
|
||||||
|
ReceiveAsync<TReplyableMessage>(message => HandleMessageWithReplyLater(action, message));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void ReceiveAsyncAndReplyLater<TReplyableCommand, TReply>(Func<TReplyableCommand, TaskCompletionSource<TReply>, Task> action) where TReplyableCommand : TMessage, ICanReply<TReply> {
|
protected void ReceiveAsyncAndReply<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
|
||||||
ReceiveAsync<TReplyableCommand>(message => HandleMessageWithReplyAsync(action, message));
|
ReceiveAsync<TReplyableMessage>(message => HandleMessageWithReplyAsync(action, message));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleMessageWithReply<TReplyableCommand, TReply>(Func<TReplyableCommand, TReply> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
|
private void HandleMessageWithReply<TReplyableMessage, TReply>(Func<TReplyableMessage, TReply> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
|
||||||
try {
|
try {
|
||||||
Sender.Tell(action(message), Self);
|
Sender.Tell(action(message), Self);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Sender.Tell(new Status.Failure(e), Self);
|
Sender.Tell(new Status.Failure(e), Self);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task HandleMessageWithReplyAsync<TReplyableCommand, TReply>(Func<TReplyableCommand, Task<TReply>> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
|
private Task HandleMessageWithReplyLater<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
|
||||||
try {
|
try {
|
||||||
Sender.Tell(await action(message), Self);
|
action(message).PipeTo(Sender, Self);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Sender.Tell(new Status.Failure(e), Self);
|
Sender.Tell(new Status.Failure(e), Self);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private async Task HandleMessageWithReplyAsync<TReplyableCommand, TReply>(Func<TReplyableCommand, TaskCompletionSource<TReply>, Task> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
|
|
||||||
var taskCompletionSource = new TaskCompletionSource<TReply>();
|
|
||||||
var sender = Sender;
|
|
||||||
|
|
||||||
taskCompletionSource.Task.ContinueWith(task => ((ICanTell) sender).Tell())
|
return Task.CompletedTask;
|
||||||
try {
|
|
||||||
Sender.Tell(await action(message), Self);
|
|
||||||
} catch (Exception e) {
|
|
||||||
Sender.Tell(new Status.Failure(e), Self);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void CompleteWith<TReply>(TaskCompletionSource<TReply> taskCompletionSource, Func<Task<TReply>> task) {
|
private async Task HandleMessageWithReplyAsync<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
|
||||||
|
try {
|
||||||
|
Sender.Tell(await action(message), Self);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Sender.Tell(new Status.Failure(e), Self);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
33
Utils/Phantom.Utils.Actor/Tasks/TaskExtensions.cs
Normal file
33
Utils/Phantom.Utils.Actor/Tasks/TaskExtensions.cs
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
using Akka.Dispatch;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Actor.Tasks;
|
||||||
|
|
||||||
|
public static class TaskExtensions {
|
||||||
|
public static Task<TResult> ContinueOnActor<TSource, TResult>(this Task<TSource> task, Func<TSource, TResult> mapper) {
|
||||||
|
if (TaskScheduler.Current is not ActorTaskScheduler actorTaskScheduler) {
|
||||||
|
throw new InvalidOperationException("Task must be scheduled in Actor context!");
|
||||||
|
}
|
||||||
|
|
||||||
|
var continuationCompletionSource = new TaskCompletionSource<TResult>();
|
||||||
|
var continuationTask = task.ContinueWith(t => MapResult(t, mapper, continuationCompletionSource), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, actorTaskScheduler);
|
||||||
|
return continuationTask.Unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Task<TResult> ContinueOnActor<TSource, TArg, TResult>(this Task<TSource> task, Func<TSource, TArg, TResult> mapper, TArg arg) {
|
||||||
|
return task.ContinueOnActor(result => mapper(result, arg));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Task<TResult> MapResult<TSource, TResult>(Task<TSource> task, Func<TSource, TResult> mapper, TaskCompletionSource<TResult> completionSource) {
|
||||||
|
if (task.IsFaulted) {
|
||||||
|
completionSource.SetException(task.Exception.InnerExceptions);
|
||||||
|
}
|
||||||
|
else if (task.IsCanceled) {
|
||||||
|
completionSource.SetCanceled();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
completionSource.SetResult(mapper(task.Result));
|
||||||
|
}
|
||||||
|
|
||||||
|
return completionSource.Task;
|
||||||
|
}
|
||||||
|
}
|
@ -1,16 +1,24 @@
|
|||||||
using NetMQ;
|
using NetMQ;
|
||||||
using NetMQ.Sockets;
|
using NetMQ.Sockets;
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Tasks;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Runtime;
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
public sealed class RpcConnectionToServer<TListener> : RpcConnection<TListener> {
|
public sealed class RpcConnectionToServer<TListener> : RpcConnection<TListener> {
|
||||||
private readonly ClientSocket socket;
|
private readonly ClientSocket socket;
|
||||||
|
private readonly TaskCompletionSource isReady = AsyncTasks.CreateCompletionSource();
|
||||||
|
|
||||||
|
public Task IsReady => isReady.Task;
|
||||||
|
|
||||||
internal RpcConnectionToServer(string loggerName, ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
|
internal RpcConnectionToServer(string loggerName, ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
|
||||||
this.socket = socket;
|
this.socket = socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void SetIsReady() {
|
||||||
|
isReady.TrySetResult();
|
||||||
|
}
|
||||||
|
|
||||||
private protected override ValueTask Send(byte[] bytes) {
|
private protected override ValueTask Send(byte[] bytes) {
|
||||||
return socket.SendAsync(bytes);
|
return socket.SendAsync(bytes);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user