1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-10-19 09:42:50 +02:00

Compare commits

..

2 Commits

Author SHA1 Message Date
ebc2db9c49
WIP? 2024-03-09 21:58:22 +01:00
71acce3123
Implement actors in Controller via Akka.NET 2024-03-09 14:05:49 +01:00
10 changed files with 303 additions and 187 deletions

View File

@ -22,8 +22,10 @@ sealed class KeepAliveLoop {
private async Task Run() {
var cancellationToken = cancellationTokenSource.Token;
Logger.Information("Started keep-alive loop.");
try {
await connection.IsReady.WaitAsync(cancellationToken);
Logger.Information("Started keep-alive loop.");
while (true) {
await Task.Delay(KeepAliveInterval, cancellationToken);
await connection.Send(new AgentIsAliveMessage()).WaitAsync(cancellationToken);

View File

@ -40,6 +40,8 @@ public sealed class MessageListener : IMessageToAgentListener {
}
}
connection.SetIsReady();
await connection.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
await agent.InstanceSessionManager.RefreshAgentStatus();

View File

@ -1,25 +1,21 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using Akka.Actor;
using Microsoft.EntityFrameworkCore;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Minecraft;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Actor.Mailbox;
using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Serilog;
namespace Phantom.Controller.Services.Agents;
@ -30,7 +26,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5);
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, TaskManager TaskManager, CancellationToken CancellationToken);
public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new AgentActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
@ -39,7 +35,6 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private readonly ControllerState controllerState;
private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken;
private AgentConfiguration configuration;
@ -66,36 +61,32 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
}
private readonly ActorRef<AgentDatabaseStorageActor.ICommand> databaseStorageActor;
private readonly ActorRef<AgentInstanceRouterActor.ICommand> instanceRouterActor;
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
private readonly Dictionary<Guid, Instance> instanceDataByGuid = new ();
private AgentActor(Init init) {
this.controllerState = init.ControllerState;
this.minecraftVersions = init.MinecraftVersions;
this.dbProvider = init.DbProvider;
this.taskManager = init.TaskManager;
this.cancellationToken = init.CancellationToken;
this.configuration = init.Configuration;
this.connection = new AgentConnection(configuration.AgentGuid, configuration.AgentName);
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(configuration.AgentGuid, dbProvider, cancellationToken)), "DatabaseStorage");
this.instanceRouterActor = Context.ActorOf(AgentInstanceRouterActor.Factory(new AgentInstanceRouterActor.Init(SelfTyped, connection, minecraftVersions, dbProvider, cancellationToken)), "InstanceRouter");
NotifyAgentUpdated();
ReceiveAsync<InitializeCommand>(Initialize);
ReceiveAndReply<RegisterCommand, ImmutableArray<Instance>>(Register);
ReceiveAsyncAndReply<RegisterCommand, ImmutableArray<ConfigureInstanceMessage>>(Register);
Receive<UnregisterCommand>(Unregister);
Receive<RefreshConnectionStatusCommand>(RefreshConnectionStatus);
Receive<NotifyIsAliveCommand>(NotifyIsAlive);
Receive<UpdateStatsCommand>(UpdateStats);
Receive<UpdateJavaRuntimesCommand>(UpdateJavaRuntimes);
ReceiveAsyncAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
Receive<LaunchInstanceCommand>(LaunchInstance);
Receive<StopInstanceCommand>(StopInstance);
Receive<SendMinecraftCommandCommand>(SendMinecraftCommand);
Receive<RouteToInstanceCommand>(RouteToInstance);
Receive<ReceiveInstanceDataCommand>(ReceiveInstanceData);
}
@ -109,12 +100,9 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
Context.System.Scheduler.ScheduleTellRepeatedly(DisconnectionRecheckInterval, DisconnectionRecheckInterval, Self, new RefreshConnectionStatusCommand(), Self);
}
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
private void CreateNewInstance(Instance instance) {
UpdateInstanceData(instance);
var instanceActor = CreateInstanceActor(instance);
instanceActorByGuid.Add(instance.Configuration.InstanceGuid, instanceActor);
return instanceActor;
instanceRouterActor.Tell(new AgentInstanceRouterActor.InitializeInstanceCommand(instance));
}
private void UpdateInstanceData(Instance instance) {
@ -122,26 +110,22 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
controllerState.UpdateInstance(instance);
}
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
var init = new InstanceActor.Init(instance, SelfTyped, connection, dbProvider, cancellationToken);
var name = "Instance:" + instance.Configuration.InstanceGuid;
return Context.ActorOf(InstanceActor.Factory(init), name);
}
private async Task<ImmutableArray<ConfigureInstanceMessage>> PrepareInitialConfigurationMessages() {
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
instance.Forward(command);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
foreach (var (instanceConfiguration, _, launchAutomatically) in instanceDataByGuid.Values.ToImmutableArray()) {
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
configurationMessages.Add(new ConfigureInstanceMessage(instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
}
return configurationMessages.ToImmutable();
}
public interface ICommand {}
private sealed record InitializeCommand : ICommand;
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<Instance>>;
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand;
@ -153,15 +137,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
public sealed record UpdateJavaRuntimesCommand(ImmutableArray<TaggedJavaRuntime> JavaRuntimes) : ICommand;
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendMinecraftCommandCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
public sealed record RouteToInstanceCommand(AgentInstanceRouterActor.ICommand Command) : ICommand;
public sealed record ReceiveInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead;
@ -185,19 +161,21 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
}
}
private ImmutableArray<Instance> Register(RegisterCommand command) {
private async Task<ImmutableArray<ConfigureInstanceMessage>> Register(RegisterCommand command) {
var configurationMessages = await PrepareInitialConfigurationMessages();
configuration = command.Configuration;
connection.UpdateConnection(command.Connection, configuration.AgentName);
lastPingTime = DateTimeOffset.Now;
isOnline = true;
NotifyAgentUpdated();
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentCommand(configuration.AgentName, configuration.ProtocolVersion, configuration.BuildVersion, configuration.MaxInstances, configuration.MaxMemory));
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
return instanceDataByGuid.Values.ToImmutableArray();
return configurationMessages;
}
private void Unregister(UnregisterCommand command) {
@ -207,11 +185,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
isOnline = false;
NotifyAgentUpdated();
var setStatusCommand = new InstanceActor.SetStatusCommand(InstanceStatus.Offline);
foreach (var instance in instanceActorByGuid.Values) {
instance.Tell(setStatusCommand);
}
instanceRouterActor.Tell(new AgentInstanceRouterActor.MarkInstancesAsOfflineCommand());
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
}
@ -245,80 +219,8 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
controllerState.UpdateAgentJavaRuntimes(configuration.AgentGuid, javaRuntimes);
}
private async Task CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command, TaskCompletionSource<InstanceActionResult<CreateOrUpdateInstanceResult>> result2) {
var instanceConfiguration = command.Configuration;
var instanceName = instanceConfiguration.InstanceName;
var instanceGuid = instanceConfiguration.InstanceGuid;
if (string.IsNullOrWhiteSpace(instanceName)) {
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
return;
}
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
return;
}
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
if (serverExecutableInfo == null) {
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
return;
}
bool isNewInstance = !instanceActorByGuid.TryGetValue(instanceGuid, out var instanceActorRef);
if (isNewInstance) {
instanceActorRef = CreateNewInstance(Instance.Offline(instanceConfiguration));
}
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isNewInstance);
taskManager.Run("Configure instance " + instanceGuid, async () => await ConfigureInstance(instanceActorRef, configureInstanceCommand, configuration.AgentName, result2, cancellationToken));
}
[SuppressMessage("ReSharper", "ConvertIfStatementToConditionalTernaryExpression")]
private static async Task ConfigureInstance(ActorRef<InstanceActor.ICommand> instanceActorRef, InstanceActor.ConfigureInstanceCommand command, string agentName, TaskCompletionSource<InstanceActionResult<CreateOrUpdateInstanceResult>> result2, CancellationToken cancellationToken) {
var instanceName = command.Configuration.InstanceName;
var instanceGuid = command.Configuration.InstanceGuid;
var result = await instanceActorRef.Request(command, cancellationToken);
if (result.Is(ConfigureInstanceResult.Success)) {
if (command.IsNewInstance) {
Logger.Information("Added instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\".", instanceName, instanceGuid, agentName);
}
else {
Logger.Information("Edited instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\".", instanceName, instanceGuid, agentName);
}
}
else {
if (command.IsNewInstance) {
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, agentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
else {
Logger.Information("Failed editing instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, agentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
}
result2.SetResult(result.Map(static result => result switch {
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
_ => CreateOrUpdateInstanceResult.UnknownError
}));
}
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
}
private void LaunchInstance(LaunchInstanceCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
}
private void StopInstance(StopInstanceCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
}
private void SendMinecraftCommand(SendMinecraftCommandCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SendMinecraftCommandCommand(command.AuditLogUserGuid, command.Command));
private void RouteToInstance(RouteToInstanceCommand command) {
instanceRouterActor.Forward(command.Command);
}
private void ReceiveInstanceData(ReceiveInstanceDataCommand command) {

View File

@ -0,0 +1,188 @@
using Phantom.Common.Data;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance;
using Phantom.Controller.Database;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Actor.Mailbox;
using Phantom.Utils.Actor.Tasks;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Agents;
sealed class AgentInstanceRouterActor : ReceiveActor<AgentInstanceRouterActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<AgentInstanceRouterActor>();
public readonly record struct Init(ActorRef<AgentActor.ICommand> AgentActorRef, AgentConnection AgentConnection, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new AgentInstanceRouterActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
}
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
private readonly AgentConnection agentConnection;
private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
private AgentInstanceRouterActor(Init init) {
this.agentActorRef = init.AgentActorRef;
this.agentConnection = init.AgentConnection;
this.minecraftVersions = init.MinecraftVersions;
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
Receive<InitializeInstanceCommand>(InitializeInstance);
Receive<MarkInstancesAsOfflineCommand>(MarkInstancesAsOffline);
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
ReceiveAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
ReceiveAndReplyLater<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
ReceiveAndReplyLater<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
ReceiveAndReplyLater<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
}
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
var instanceGuid = instance.Configuration.InstanceGuid;
if (instanceActorByGuid.ContainsKey(instanceGuid)) {
throw new InvalidOperationException("Instance already exists: " + instanceGuid);
}
var instanceActor = CreateInstanceActor(instance);
instanceActorByGuid.Add(instanceGuid, instanceActor);
return instanceActor;
}
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
var init = new InstanceActor.Init(instance, agentActorRef, agentConnection, dbProvider, cancellationToken);
var name = "Instance:" + instance.Configuration.InstanceGuid;
return Context.ActorOf(InstanceActor.Factory(init), name);
}
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
instance.Tell(command);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
}
}
private void TellAllInstances(InstanceActor.ICommand command) {
foreach (var instance in instanceActorByGuid.Values) {
instance.Tell(command);
}
}
private Task<InstanceActionResult<TReply>> RequestInstance<TCommand, TReply>(Guid instanceGuid, TCommand command) where TCommand : InstanceActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
return instance.Request(command, cancellationToken);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
return Task.FromResult(InstanceActionResult.General<TReply>(InstanceActionGeneralResult.InstanceDoesNotExist));
}
}
public interface ICommand {}
public sealed record InitializeInstanceCommand(Instance Instance) : ICommand;
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
public sealed record MarkInstancesAsOfflineCommand : ICommand;
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendCommandToInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
private void InitializeInstance(InitializeInstanceCommand command) {
CreateNewInstance(command.Instance);
}
private void MarkInstancesAsOffline(MarkInstancesAsOfflineCommand command) {
TellAllInstances(new InstanceActor.SetStatusCommand(InstanceStatus.Offline));
}
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command) {
var instanceConfiguration = command.Configuration;
if (string.IsNullOrWhiteSpace(instanceConfiguration.InstanceName)) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
}
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
}
return minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken)
.ContinueOnActor(CreateOrUpdateInstance1, command)
.Unwrap();
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance1(FileDownloadInfo? serverExecutableInfo, CreateOrUpdateInstanceCommand command) {
if (serverExecutableInfo == null) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
}
var instanceConfiguration = command.Configuration;
bool isCreatingInstance = !instanceActorByGuid.TryGetValue(instanceConfiguration.InstanceGuid, out var instanceActorRef);
if (isCreatingInstance) {
instanceActorRef = CreateNewInstance(Instance.Offline(instanceConfiguration));
}
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isCreatingInstance);
return instanceActorRef.Request(configureInstanceCommand, cancellationToken)
.ContinueOnActor(CreateOrUpdateInstance2, configureInstanceCommand);
}
private InstanceActionResult<CreateOrUpdateInstanceResult> CreateOrUpdateInstance2(InstanceActionResult<ConfigureInstanceResult> result, InstanceActor.ConfigureInstanceCommand command) {
var instanceName = command.Configuration.InstanceName;
var instanceGuid = command.Configuration.InstanceGuid;
var isCreating = command.IsCreatingInstance;
if (result.Is(ConfigureInstanceResult.Success)) {
string action = isCreating ? "Added" : "Edited";
string relation = isCreating ? "to agent" : "in agent";
Logger.Information(action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\".", instanceName, instanceGuid, configuration.AgentName);
}
else {
string action = isCreating ? "adding" : "editing";
string relation = isCreating ? "to agent" : "in agent";
Logger.Information("Failed " + action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, configuration.AgentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
return result.Map(static result => result switch {
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
_ => CreateOrUpdateInstanceResult.UnknownError
});
}
private Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
return RequestInstance<InstanceActor.LaunchInstanceCommand, LaunchInstanceResult>(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
}
private Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
return RequestInstance<InstanceActor.StopInstanceCommand, StopInstanceResult>(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
}
private Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
return RequestInstance<InstanceActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(command.InstanceGuid, new InstanceActor.SendCommandToInstanceCommand(command.AuditLogUserGuid, command.Command));
}
}

View File

@ -1,12 +1,9 @@
using System.Collections.Concurrent;
using System.Collections.Immutable;
using Akka.Actor;
using Phantom.Common.Data;
using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database;
@ -68,23 +65,12 @@ sealed class AgentManager {
var agentProperties = AgentConfiguration.From(agentInfo);
var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
var agentInstances = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
await connection.Send(new RegisterAgentSuccessMessage(await GetInstanceConfigurationsForAgent(agentInstances)));
var configureInstanceMessages = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
await connection.Send(new RegisterAgentSuccessMessage(configureInstanceMessages));
return true;
}
private async Task<ImmutableArray<ConfigureInstanceMessage>> GetInstanceConfigurationsForAgent(ImmutableArray<Instance> instances) {
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
foreach (var (configuration, _, launchAutomatically) in instances) {
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(configuration.MinecraftVersion, cancellationToken);
configurationMessages.Add(new ConfigureInstanceMessage(configuration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
}
return configurationMessages.ToImmutable();
}
public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) {
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
agent.Tell(command);
@ -96,9 +82,9 @@ sealed class AgentManager {
}
}
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentInstanceRouterActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
return await agent.Request(command, cancellationToken);
return await agent.Request(new AgentActor.RouteToInstanceCommand(command), cancellationToken);
}
else {
return InstanceActionResult.General<TReply>(InstanceActionGeneralResult.AgentDoesNotExist);

View File

@ -46,7 +46,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
ReceiveAsyncAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance);
ReceiveAsyncAndReply<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
ReceiveAsyncAndReply<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
ReceiveAsyncAndReply<SendMinecraftCommandCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
ReceiveAsyncAndReply<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
}
private void NotifyInstanceUpdated() {
@ -63,13 +63,13 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
public sealed record SetStatusCommand(IInstanceStatus Status) : ICommand;
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsNewInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsCreatingInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
public sealed record LaunchInstanceCommand(Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendMinecraftCommandCommand(Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
public sealed record SendCommandToInstanceCommand(Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
private void SetStatus(SetStatusCommand command) {
status = command.Status;
@ -98,11 +98,11 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
if (command.IsNewInstance) {
auditLogWriter.InstanceEdited(configuration.InstanceGuid);
if (command.IsCreatingInstance) {
auditLogWriter.InstanceCreated(configuration.InstanceGuid);
}
else {
auditLogWriter.InstanceCreated(configuration.InstanceGuid);
auditLogWriter.InstanceEdited(configuration.InstanceGuid);
}
await db.Ctx.SaveChangesAsync(cancellationToken);
@ -148,7 +148,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
}
}
private async Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendMinecraftCommandCommand command) {
private async Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
var message = new SendCommandToInstanceMessage(InstanceGuid, command.Command);
var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(message);

View File

@ -199,7 +199,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
}
public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.SendMinecraftCommandCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendMinecraftCommandCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
return agentManager.DoInstanceAction<AgentActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendCommandToInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
}
public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {

View File

@ -5,19 +5,20 @@ namespace Phantom.Utils.Actor;
public abstract class ReceiveActor<TMessage> : ReceiveActor {
protected ActorRef<TMessage> SelfTyped => new (Self);
protected void ReceiveAndReply<TReplyableCommand, TReply>(Func<TReplyableCommand, TReply> action) where TReplyableCommand : TMessage, ICanReply<TReply> {
Receive<TReplyableCommand>(message => HandleMessageWithReply(action, message));
protected void ReceiveAndReply<TReplyableMessage, TReply>(Func<TReplyableMessage, TReply> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
Receive<TReplyableMessage>(message => HandleMessageWithReply(action, message));
}
protected void ReceiveAsyncAndReply<TReplyableCommand, TReply>(Func<TReplyableCommand, Task<TReply>> action) where TReplyableCommand : TMessage, ICanReply<TReply> {
ReceiveAsync<TReplyableCommand>(message => HandleMessageWithReplyAsync(action, message));
protected void ReceiveAndReplyLater<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
// Must be async to set default task scheduler to actor scheduler.
ReceiveAsync<TReplyableMessage>(message => HandleMessageWithReplyLater(action, message));
}
protected void ReceiveAsyncAndReplyLater<TReplyableCommand, TReply>(Func<TReplyableCommand, TaskCompletionSource<TReply>, Task> action) where TReplyableCommand : TMessage, ICanReply<TReply> {
ReceiveAsync<TReplyableCommand>(message => HandleMessageWithReplyAsync(action, message));
protected void ReceiveAsyncAndReply<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
ReceiveAsync<TReplyableMessage>(message => HandleMessageWithReplyAsync(action, message));
}
private void HandleMessageWithReply<TReplyableCommand, TReply>(Func<TReplyableCommand, TReply> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
private void HandleMessageWithReply<TReplyableMessage, TReply>(Func<TReplyableMessage, TReply> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
try {
Sender.Tell(action(message), Self);
} catch (Exception e) {
@ -25,27 +26,21 @@ public abstract class ReceiveActor<TMessage> : ReceiveActor {
}
}
private async Task HandleMessageWithReplyAsync<TReplyableCommand, TReply>(Func<TReplyableCommand, Task<TReply>> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
private Task HandleMessageWithReplyLater<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
try {
action(message).PipeTo(Sender, Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
return Task.CompletedTask;
}
private async Task HandleMessageWithReplyAsync<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
try {
Sender.Tell(await action(message), Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
}
private async Task HandleMessageWithReplyAsync<TReplyableCommand, TReply>(Func<TReplyableCommand, TaskCompletionSource<TReply>, Task> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
var taskCompletionSource = new TaskCompletionSource<TReply>();
var sender = Sender;
taskCompletionSource.Task.ContinueWith(task => ((ICanTell) sender).Tell())
try {
Sender.Tell(await action(message), Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
}
protected void CompleteWith<TReply>(TaskCompletionSource<TReply> taskCompletionSource, Func<Task<TReply>> task) {
}
}

View File

@ -0,0 +1,33 @@
using Akka.Dispatch;
namespace Phantom.Utils.Actor.Tasks;
public static class TaskExtensions {
public static Task<TResult> ContinueOnActor<TSource, TResult>(this Task<TSource> task, Func<TSource, TResult> mapper) {
if (TaskScheduler.Current is not ActorTaskScheduler actorTaskScheduler) {
throw new InvalidOperationException("Task must be scheduled in Actor context!");
}
var continuationCompletionSource = new TaskCompletionSource<TResult>();
var continuationTask = task.ContinueWith(t => MapResult(t, mapper, continuationCompletionSource), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, actorTaskScheduler);
return continuationTask.Unwrap();
}
public static Task<TResult> ContinueOnActor<TSource, TArg, TResult>(this Task<TSource> task, Func<TSource, TArg, TResult> mapper, TArg arg) {
return task.ContinueOnActor(result => mapper(result, arg));
}
private static Task<TResult> MapResult<TSource, TResult>(Task<TSource> task, Func<TSource, TResult> mapper, TaskCompletionSource<TResult> completionSource) {
if (task.IsFaulted) {
completionSource.SetException(task.Exception.InnerExceptions);
}
else if (task.IsCanceled) {
completionSource.SetCanceled();
}
else {
completionSource.SetResult(mapper(task.Result));
}
return completionSource.Task;
}
}

View File

@ -1,16 +1,24 @@
using NetMQ;
using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Tasks;
namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcConnectionToServer<TListener> : RpcConnection<TListener> {
private readonly ClientSocket socket;
private readonly TaskCompletionSource isReady = AsyncTasks.CreateCompletionSource();
public Task IsReady => isReady.Task;
internal RpcConnectionToServer(string loggerName, ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
this.socket = socket;
}
public void SetIsReady() {
isReady.TrySetResult();
}
private protected override ValueTask Send(byte[] bytes) {
return socket.SendAsync(bytes);
}