1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-10-19 00:42:50 +02:00

Compare commits

..

2 Commits

Author SHA1 Message Date
ebc2db9c49
WIP? 2024-03-09 21:58:22 +01:00
71acce3123
Implement actors in Controller via Akka.NET 2024-03-09 14:05:49 +01:00
10 changed files with 303 additions and 187 deletions

View File

@ -21,9 +21,11 @@ sealed class KeepAliveLoop {
private async Task Run() { private async Task Run() {
var cancellationToken = cancellationTokenSource.Token; var cancellationToken = cancellationTokenSource.Token;
Logger.Information("Started keep-alive loop.");
try { try {
await connection.IsReady.WaitAsync(cancellationToken);
Logger.Information("Started keep-alive loop.");
while (true) { while (true) {
await Task.Delay(KeepAliveInterval, cancellationToken); await Task.Delay(KeepAliveInterval, cancellationToken);
await connection.Send(new AgentIsAliveMessage()).WaitAsync(cancellationToken); await connection.Send(new AgentIsAliveMessage()).WaitAsync(cancellationToken);

View File

@ -40,6 +40,8 @@ public sealed class MessageListener : IMessageToAgentListener {
} }
} }
connection.SetIsReady();
await connection.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All)); await connection.Send(new AdvertiseJavaRuntimesMessage(agent.JavaRuntimeRepository.All));
await agent.InstanceSessionManager.RefreshAgentStatus(); await agent.InstanceSessionManager.RefreshAgentStatus();

View File

@ -1,25 +1,21 @@
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using Akka.Actor; using Akka.Actor;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance; using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Java; using Phantom.Common.Data.Java;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent; using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance; using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Data.Web.Minecraft; using Phantom.Common.Data.Web.Minecraft;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database; using Phantom.Controller.Database;
using Phantom.Controller.Minecraft; using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor; using Phantom.Utils.Actor;
using Phantom.Utils.Actor.Mailbox; using Phantom.Utils.Actor.Mailbox;
using Phantom.Utils.Logging; using Phantom.Utils.Logging;
using Phantom.Utils.Rpc.Runtime; using Phantom.Utils.Rpc.Runtime;
using Phantom.Utils.Tasks;
using Serilog; using Serilog;
namespace Phantom.Controller.Services.Agents; namespace Phantom.Controller.Services.Agents;
@ -30,7 +26,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5); private static readonly TimeSpan DisconnectionRecheckInterval = TimeSpan.FromSeconds(5);
private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12); private static readonly TimeSpan DisconnectionThreshold = TimeSpan.FromSeconds(12);
public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, TaskManager TaskManager, CancellationToken CancellationToken); public readonly record struct Init(AgentConfiguration Configuration, ControllerState ControllerState, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) { public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new AgentActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name }); return Props<ICommand>.Create(() => new AgentActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
@ -39,7 +35,6 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
private readonly ControllerState controllerState; private readonly ControllerState controllerState;
private readonly MinecraftVersions minecraftVersions; private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider; private readonly IDbContextProvider dbProvider;
private readonly TaskManager taskManager;
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
private AgentConfiguration configuration; private AgentConfiguration configuration;
@ -66,36 +61,32 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
} }
private readonly ActorRef<AgentDatabaseStorageActor.ICommand> databaseStorageActor; private readonly ActorRef<AgentDatabaseStorageActor.ICommand> databaseStorageActor;
private readonly ActorRef<AgentInstanceRouterActor.ICommand> instanceRouterActor;
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
private readonly Dictionary<Guid, Instance> instanceDataByGuid = new (); private readonly Dictionary<Guid, Instance> instanceDataByGuid = new ();
private AgentActor(Init init) { private AgentActor(Init init) {
this.controllerState = init.ControllerState; this.controllerState = init.ControllerState;
this.minecraftVersions = init.MinecraftVersions; this.minecraftVersions = init.MinecraftVersions;
this.dbProvider = init.DbProvider; this.dbProvider = init.DbProvider;
this.taskManager = init.TaskManager;
this.cancellationToken = init.CancellationToken; this.cancellationToken = init.CancellationToken;
this.configuration = init.Configuration; this.configuration = init.Configuration;
this.connection = new AgentConnection(configuration.AgentGuid, configuration.AgentName); this.connection = new AgentConnection(configuration.AgentGuid, configuration.AgentName);
this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(configuration.AgentGuid, dbProvider, cancellationToken)), "DatabaseStorage"); this.databaseStorageActor = Context.ActorOf(AgentDatabaseStorageActor.Factory(new AgentDatabaseStorageActor.Init(configuration.AgentGuid, dbProvider, cancellationToken)), "DatabaseStorage");
this.instanceRouterActor = Context.ActorOf(AgentInstanceRouterActor.Factory(new AgentInstanceRouterActor.Init(SelfTyped, connection, minecraftVersions, dbProvider, cancellationToken)), "InstanceRouter");
NotifyAgentUpdated(); NotifyAgentUpdated();
ReceiveAsync<InitializeCommand>(Initialize); ReceiveAsync<InitializeCommand>(Initialize);
ReceiveAndReply<RegisterCommand, ImmutableArray<Instance>>(Register); ReceiveAsyncAndReply<RegisterCommand, ImmutableArray<ConfigureInstanceMessage>>(Register);
Receive<UnregisterCommand>(Unregister); Receive<UnregisterCommand>(Unregister);
Receive<RefreshConnectionStatusCommand>(RefreshConnectionStatus); Receive<RefreshConnectionStatusCommand>(RefreshConnectionStatus);
Receive<NotifyIsAliveCommand>(NotifyIsAlive); Receive<NotifyIsAliveCommand>(NotifyIsAlive);
Receive<UpdateStatsCommand>(UpdateStats); Receive<UpdateStatsCommand>(UpdateStats);
Receive<UpdateJavaRuntimesCommand>(UpdateJavaRuntimes); Receive<UpdateJavaRuntimesCommand>(UpdateJavaRuntimes);
ReceiveAsyncAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance); Receive<RouteToInstanceCommand>(RouteToInstance);
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
Receive<LaunchInstanceCommand>(LaunchInstance);
Receive<StopInstanceCommand>(StopInstance);
Receive<SendMinecraftCommandCommand>(SendMinecraftCommand);
Receive<ReceiveInstanceDataCommand>(ReceiveInstanceData); Receive<ReceiveInstanceDataCommand>(ReceiveInstanceData);
} }
@ -109,12 +100,9 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
Context.System.Scheduler.ScheduleTellRepeatedly(DisconnectionRecheckInterval, DisconnectionRecheckInterval, Self, new RefreshConnectionStatusCommand(), Self); Context.System.Scheduler.ScheduleTellRepeatedly(DisconnectionRecheckInterval, DisconnectionRecheckInterval, Self, new RefreshConnectionStatusCommand(), Self);
} }
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) { private void CreateNewInstance(Instance instance) {
UpdateInstanceData(instance); UpdateInstanceData(instance);
instanceRouterActor.Tell(new AgentInstanceRouterActor.InitializeInstanceCommand(instance));
var instanceActor = CreateInstanceActor(instance);
instanceActorByGuid.Add(instance.Configuration.InstanceGuid, instanceActor);
return instanceActor;
} }
private void UpdateInstanceData(Instance instance) { private void UpdateInstanceData(Instance instance) {
@ -122,26 +110,22 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
controllerState.UpdateInstance(instance); controllerState.UpdateInstance(instance);
} }
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) { private async Task<ImmutableArray<ConfigureInstanceMessage>> PrepareInitialConfigurationMessages() {
var init = new InstanceActor.Init(instance, SelfTyped, connection, dbProvider, cancellationToken); var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
var name = "Instance:" + instance.Configuration.InstanceGuid;
return Context.ActorOf(InstanceActor.Factory(init), name); foreach (var (instanceConfiguration, _, launchAutomatically) in instanceDataByGuid.Values.ToImmutableArray()) {
} var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
configurationMessages.Add(new ConfigureInstanceMessage(instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
}
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) { return configurationMessages.ToImmutable();
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
instance.Forward(command);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
}
} }
public interface ICommand {} public interface ICommand {}
private sealed record InitializeCommand : ICommand; private sealed record InitializeCommand : ICommand;
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<Instance>>; public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand; public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgentListener> Connection) : ICommand;
@ -153,15 +137,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
public sealed record UpdateJavaRuntimesCommand(ImmutableArray<TaggedJavaRuntime> JavaRuntimes) : ICommand; public sealed record UpdateJavaRuntimesCommand(ImmutableArray<TaggedJavaRuntime> JavaRuntimes) : ICommand;
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>; public sealed record RouteToInstanceCommand(AgentInstanceRouterActor.ICommand Command) : ICommand;
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendMinecraftCommandCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
public sealed record ReceiveInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead; public sealed record ReceiveInstanceDataCommand(Instance Instance) : ICommand, IJumpAhead;
@ -185,19 +161,21 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
} }
} }
private ImmutableArray<Instance> Register(RegisterCommand command) { private async Task<ImmutableArray<ConfigureInstanceMessage>> Register(RegisterCommand command) {
var configurationMessages = await PrepareInitialConfigurationMessages();
configuration = command.Configuration; configuration = command.Configuration;
connection.UpdateConnection(command.Connection, configuration.AgentName); connection.UpdateConnection(command.Connection, configuration.AgentName);
lastPingTime = DateTimeOffset.Now; lastPingTime = DateTimeOffset.Now;
isOnline = true; isOnline = true;
NotifyAgentUpdated(); NotifyAgentUpdated();
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentCommand(configuration.AgentName, configuration.ProtocolVersion, configuration.BuildVersion, configuration.MaxInstances, configuration.MaxMemory)); databaseStorageActor.Tell(new AgentDatabaseStorageActor.StoreAgentCommand(configuration.AgentName, configuration.ProtocolVersion, configuration.BuildVersion, configuration.MaxInstances, configuration.MaxMemory));
Logger.Information("Registered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid); return configurationMessages;
return instanceDataByGuid.Values.ToImmutableArray();
} }
private void Unregister(UnregisterCommand command) { private void Unregister(UnregisterCommand command) {
@ -207,11 +185,7 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
isOnline = false; isOnline = false;
NotifyAgentUpdated(); NotifyAgentUpdated();
var setStatusCommand = new InstanceActor.SetStatusCommand(InstanceStatus.Offline); instanceRouterActor.Tell(new AgentInstanceRouterActor.MarkInstancesAsOfflineCommand());
foreach (var instance in instanceActorByGuid.Values) {
instance.Tell(setStatusCommand);
}
Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid); Logger.Information("Unregistered agent \"{Name}\" (GUID {Guid}).", configuration.AgentName, configuration.AgentGuid);
} }
@ -244,83 +218,11 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
javaRuntimes = command.JavaRuntimes; javaRuntimes = command.JavaRuntimes;
controllerState.UpdateAgentJavaRuntimes(configuration.AgentGuid, javaRuntimes); controllerState.UpdateAgentJavaRuntimes(configuration.AgentGuid, javaRuntimes);
} }
private async Task CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command, TaskCompletionSource<InstanceActionResult<CreateOrUpdateInstanceResult>> result2) {
var instanceConfiguration = command.Configuration;
var instanceName = instanceConfiguration.InstanceName;
var instanceGuid = instanceConfiguration.InstanceGuid;
if (string.IsNullOrWhiteSpace(instanceName)) {
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
return;
}
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
return;
}
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken);
if (serverExecutableInfo == null) {
result2.SetResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
return;
}
bool isNewInstance = !instanceActorByGuid.TryGetValue(instanceGuid, out var instanceActorRef);
if (isNewInstance) {
instanceActorRef = CreateNewInstance(Instance.Offline(instanceConfiguration));
}
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isNewInstance);
taskManager.Run("Configure instance " + instanceGuid, async () => await ConfigureInstance(instanceActorRef, configureInstanceCommand, configuration.AgentName, result2, cancellationToken));
}
[SuppressMessage("ReSharper", "ConvertIfStatementToConditionalTernaryExpression")] private void RouteToInstance(RouteToInstanceCommand command) {
private static async Task ConfigureInstance(ActorRef<InstanceActor.ICommand> instanceActorRef, InstanceActor.ConfigureInstanceCommand command, string agentName, TaskCompletionSource<InstanceActionResult<CreateOrUpdateInstanceResult>> result2, CancellationToken cancellationToken) { instanceRouterActor.Forward(command.Command);
var instanceName = command.Configuration.InstanceName;
var instanceGuid = command.Configuration.InstanceGuid;
var result = await instanceActorRef.Request(command, cancellationToken);
if (result.Is(ConfigureInstanceResult.Success)) {
if (command.IsNewInstance) {
Logger.Information("Added instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\".", instanceName, instanceGuid, agentName);
}
else {
Logger.Information("Edited instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\".", instanceName, instanceGuid, agentName);
}
}
else {
if (command.IsNewInstance) {
Logger.Information("Failed adding instance \"{InstanceName}\" (GUID {InstanceGuid}) to agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, agentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
else {
Logger.Information("Failed editing instance \"{InstanceName}\" (GUID {InstanceGuid}) in agent \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, agentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
}
result2.SetResult(result.Map(static result => result switch {
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
_ => CreateOrUpdateInstanceResult.UnknownError
}));
} }
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
}
private void LaunchInstance(LaunchInstanceCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
}
private void StopInstance(StopInstanceCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
}
private void SendMinecraftCommand(SendMinecraftCommandCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SendMinecraftCommandCommand(command.AuditLogUserGuid, command.Command));
}
private void ReceiveInstanceData(ReceiveInstanceDataCommand command) { private void ReceiveInstanceData(ReceiveInstanceDataCommand command) {
UpdateInstanceData(command.Instance); UpdateInstanceData(command.Instance);
} }

View File

@ -0,0 +1,188 @@
using Phantom.Common.Data;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Minecraft;
using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Instance;
using Phantom.Controller.Database;
using Phantom.Controller.Minecraft;
using Phantom.Controller.Services.Instances;
using Phantom.Utils.Actor;
using Phantom.Utils.Actor.Mailbox;
using Phantom.Utils.Actor.Tasks;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Controller.Services.Agents;
sealed class AgentInstanceRouterActor : ReceiveActor<AgentInstanceRouterActor.ICommand> {
private static readonly ILogger Logger = PhantomLogger.Create<AgentInstanceRouterActor>();
public readonly record struct Init(ActorRef<AgentActor.ICommand> AgentActorRef, AgentConnection AgentConnection, MinecraftVersions MinecraftVersions, IDbContextProvider DbProvider, CancellationToken CancellationToken);
public static Props<ICommand> Factory(Init init) {
return Props<ICommand>.Create(() => new AgentInstanceRouterActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume, MailboxType = UnboundedJumpAheadMailbox.Name });
}
private readonly ActorRef<AgentActor.ICommand> agentActorRef;
private readonly AgentConnection agentConnection;
private readonly MinecraftVersions minecraftVersions;
private readonly IDbContextProvider dbProvider;
private readonly CancellationToken cancellationToken;
private readonly Dictionary<Guid, ActorRef<InstanceActor.ICommand>> instanceActorByGuid = new ();
private AgentInstanceRouterActor(Init init) {
this.agentActorRef = init.AgentActorRef;
this.agentConnection = init.AgentConnection;
this.minecraftVersions = init.MinecraftVersions;
this.dbProvider = init.DbProvider;
this.cancellationToken = init.CancellationToken;
Receive<InitializeInstanceCommand>(InitializeInstance);
Receive<MarkInstancesAsOfflineCommand>(MarkInstancesAsOffline);
Receive<UpdateInstanceStatusCommand>(UpdateInstanceStatus);
ReceiveAndReplyLater<CreateOrUpdateInstanceCommand, InstanceActionResult<CreateOrUpdateInstanceResult>>(CreateOrUpdateInstance);
ReceiveAndReplyLater<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
ReceiveAndReplyLater<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
ReceiveAndReplyLater<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
}
private ActorRef<InstanceActor.ICommand> CreateNewInstance(Instance instance) {
var instanceGuid = instance.Configuration.InstanceGuid;
if (instanceActorByGuid.ContainsKey(instanceGuid)) {
throw new InvalidOperationException("Instance already exists: " + instanceGuid);
}
var instanceActor = CreateInstanceActor(instance);
instanceActorByGuid.Add(instanceGuid, instanceActor);
return instanceActor;
}
private ActorRef<InstanceActor.ICommand> CreateInstanceActor(Instance instance) {
var init = new InstanceActor.Init(instance, agentActorRef, agentConnection, dbProvider, cancellationToken);
var name = "Instance:" + instance.Configuration.InstanceGuid;
return Context.ActorOf(InstanceActor.Factory(init), name);
}
private void TellInstance(Guid instanceGuid, InstanceActor.ICommand command) {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
instance.Tell(command);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
}
}
private void TellAllInstances(InstanceActor.ICommand command) {
foreach (var instance in instanceActorByGuid.Values) {
instance.Tell(command);
}
}
private Task<InstanceActionResult<TReply>> RequestInstance<TCommand, TReply>(Guid instanceGuid, TCommand command) where TCommand : InstanceActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
if (instanceActorByGuid.TryGetValue(instanceGuid, out var instance)) {
return instance.Request(command, cancellationToken);
}
else {
Logger.Warning("Could not deliver command {CommandType} to instance {InstanceGuid}, instance not found.", command.GetType().Name, instanceGuid);
return Task.FromResult(InstanceActionResult.General<TReply>(InstanceActionGeneralResult.InstanceDoesNotExist));
}
}
public interface ICommand {}
public sealed record InitializeInstanceCommand(Instance Instance) : ICommand;
public sealed record CreateOrUpdateInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration) : ICommand, ICanReply<InstanceActionResult<CreateOrUpdateInstanceResult>>;
public sealed record MarkInstancesAsOfflineCommand : ICommand;
public sealed record UpdateInstanceStatusCommand(Guid InstanceGuid, IInstanceStatus Status) : ICommand;
public sealed record LaunchInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendCommandToInstanceCommand(Guid InstanceGuid, Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
private void InitializeInstance(InitializeInstanceCommand command) {
CreateNewInstance(command.Instance);
}
private void MarkInstancesAsOffline(MarkInstancesAsOfflineCommand command) {
TellAllInstances(new InstanceActor.SetStatusCommand(InstanceStatus.Offline));
}
private void UpdateInstanceStatus(UpdateInstanceStatusCommand command) {
TellInstance(command.InstanceGuid, new InstanceActor.SetStatusCommand(command.Status));
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance(CreateOrUpdateInstanceCommand command) {
var instanceConfiguration = command.Configuration;
if (string.IsNullOrWhiteSpace(instanceConfiguration.InstanceName)) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceNameMustNotBeEmpty));
}
if (instanceConfiguration.MemoryAllocation <= RamAllocationUnits.Zero) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.InstanceMemoryMustNotBeZero));
}
return minecraftVersions.GetServerExecutableInfo(instanceConfiguration.MinecraftVersion, cancellationToken)
.ContinueOnActor(CreateOrUpdateInstance1, command)
.Unwrap();
}
private Task<InstanceActionResult<CreateOrUpdateInstanceResult>> CreateOrUpdateInstance1(FileDownloadInfo? serverExecutableInfo, CreateOrUpdateInstanceCommand command) {
if (serverExecutableInfo == null) {
return Task.FromResult(InstanceActionResult.Concrete(CreateOrUpdateInstanceResult.MinecraftVersionDownloadInfoNotFound));
}
var instanceConfiguration = command.Configuration;
bool isCreatingInstance = !instanceActorByGuid.TryGetValue(instanceConfiguration.InstanceGuid, out var instanceActorRef);
if (isCreatingInstance) {
instanceActorRef = CreateNewInstance(Instance.Offline(instanceConfiguration));
}
var configureInstanceCommand = new InstanceActor.ConfigureInstanceCommand(command.AuditLogUserGuid, instanceConfiguration, new InstanceLaunchProperties(serverExecutableInfo), isCreatingInstance);
return instanceActorRef.Request(configureInstanceCommand, cancellationToken)
.ContinueOnActor(CreateOrUpdateInstance2, configureInstanceCommand);
}
private InstanceActionResult<CreateOrUpdateInstanceResult> CreateOrUpdateInstance2(InstanceActionResult<ConfigureInstanceResult> result, InstanceActor.ConfigureInstanceCommand command) {
var instanceName = command.Configuration.InstanceName;
var instanceGuid = command.Configuration.InstanceGuid;
var isCreating = command.IsCreatingInstance;
if (result.Is(ConfigureInstanceResult.Success)) {
string action = isCreating ? "Added" : "Edited";
string relation = isCreating ? "to agent" : "in agent";
Logger.Information(action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\".", instanceName, instanceGuid, configuration.AgentName);
}
else {
string action = isCreating ? "adding" : "editing";
string relation = isCreating ? "to agent" : "in agent";
Logger.Information("Failed " + action + " instance \"{InstanceName}\" (GUID {InstanceGuid}) " + relation + " \"{AgentName}\". {ErrorMessage}", instanceName, instanceGuid, configuration.AgentName, result.ToSentence(ConfigureInstanceResultExtensions.ToSentence));
}
return result.Map(static result => result switch {
ConfigureInstanceResult.Success => CreateOrUpdateInstanceResult.Success,
_ => CreateOrUpdateInstanceResult.UnknownError
});
}
private Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(LaunchInstanceCommand command) {
return RequestInstance<InstanceActor.LaunchInstanceCommand, LaunchInstanceResult>(command.InstanceGuid, new InstanceActor.LaunchInstanceCommand(command.AuditLogUserGuid));
}
private Task<InstanceActionResult<StopInstanceResult>> StopInstance(StopInstanceCommand command) {
return RequestInstance<InstanceActor.StopInstanceCommand, StopInstanceResult>(command.InstanceGuid, new InstanceActor.StopInstanceCommand(command.AuditLogUserGuid, command.StopStrategy));
}
private Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
return RequestInstance<InstanceActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(command.InstanceGuid, new InstanceActor.SendCommandToInstanceCommand(command.AuditLogUserGuid, command.Command));
}
}

View File

@ -1,12 +1,9 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Immutable;
using Akka.Actor; using Akka.Actor;
using Phantom.Common.Data; using Phantom.Common.Data;
using Phantom.Common.Data.Agent; using Phantom.Common.Data.Agent;
using Phantom.Common.Data.Instance;
using Phantom.Common.Data.Replies; using Phantom.Common.Data.Replies;
using Phantom.Common.Data.Web.Agent; using Phantom.Common.Data.Web.Agent;
using Phantom.Common.Data.Web.Instance;
using Phantom.Common.Messages.Agent; using Phantom.Common.Messages.Agent;
using Phantom.Common.Messages.Agent.ToAgent; using Phantom.Common.Messages.Agent.ToAgent;
using Phantom.Controller.Database; using Phantom.Controller.Database;
@ -68,23 +65,12 @@ sealed class AgentManager {
var agentProperties = AgentConfiguration.From(agentInfo); var agentProperties = AgentConfiguration.From(agentInfo);
var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties); var agentActorRef = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
var agentInstances = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken); var configureInstanceMessages = await agentActorRef.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
await connection.Send(new RegisterAgentSuccessMessage(await GetInstanceConfigurationsForAgent(agentInstances))); await connection.Send(new RegisterAgentSuccessMessage(configureInstanceMessages));
return true; return true;
} }
private async Task<ImmutableArray<ConfigureInstanceMessage>> GetInstanceConfigurationsForAgent(ImmutableArray<Instance> instances) {
var configurationMessages = ImmutableArray.CreateBuilder<ConfigureInstanceMessage>();
foreach (var (configuration, _, launchAutomatically) in instances) {
var serverExecutableInfo = await minecraftVersions.GetServerExecutableInfo(configuration.MinecraftVersion, cancellationToken);
configurationMessages.Add(new ConfigureInstanceMessage(configuration, new InstanceLaunchProperties(serverExecutableInfo), launchAutomatically));
}
return configurationMessages.ToImmutable();
}
public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) { public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) {
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) { if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
agent.Tell(command); agent.Tell(command);
@ -96,9 +82,9 @@ sealed class AgentManager {
} }
} }
public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentActor.ICommand, ICanReply<InstanceActionResult<TReply>> { public async Task<InstanceActionResult<TReply>> DoInstanceAction<TCommand, TReply>(Guid agentGuid, TCommand command) where TCommand : class, AgentInstanceRouterActor.ICommand, ICanReply<InstanceActionResult<TReply>> {
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) { if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
return await agent.Request(command, cancellationToken); return await agent.Request(new AgentActor.RouteToInstanceCommand(command), cancellationToken);
} }
else { else {
return InstanceActionResult.General<TReply>(InstanceActionGeneralResult.AgentDoesNotExist); return InstanceActionResult.General<TReply>(InstanceActionGeneralResult.AgentDoesNotExist);

View File

@ -46,7 +46,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
ReceiveAsyncAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance); ReceiveAsyncAndReply<ConfigureInstanceCommand, InstanceActionResult<ConfigureInstanceResult>>(ConfigureInstance);
ReceiveAsyncAndReply<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance); ReceiveAsyncAndReply<LaunchInstanceCommand, InstanceActionResult<LaunchInstanceResult>>(LaunchInstance);
ReceiveAsyncAndReply<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance); ReceiveAsyncAndReply<StopInstanceCommand, InstanceActionResult<StopInstanceResult>>(StopInstance);
ReceiveAsyncAndReply<SendMinecraftCommandCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand); ReceiveAsyncAndReply<SendCommandToInstanceCommand, InstanceActionResult<SendCommandToInstanceResult>>(SendMinecraftCommand);
} }
private void NotifyInstanceUpdated() { private void NotifyInstanceUpdated() {
@ -63,13 +63,13 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
public sealed record SetStatusCommand(IInstanceStatus Status) : ICommand; public sealed record SetStatusCommand(IInstanceStatus Status) : ICommand;
public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsNewInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>; public sealed record ConfigureInstanceCommand(Guid AuditLogUserGuid, InstanceConfiguration Configuration, InstanceLaunchProperties LaunchProperties, bool IsCreatingInstance) : ICommand, ICanReply<InstanceActionResult<ConfigureInstanceResult>>;
public sealed record LaunchInstanceCommand(Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>; public sealed record LaunchInstanceCommand(Guid AuditLogUserGuid) : ICommand, ICanReply<InstanceActionResult<LaunchInstanceResult>>;
public sealed record StopInstanceCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>; public sealed record StopInstanceCommand(Guid AuditLogUserGuid, MinecraftStopStrategy StopStrategy) : ICommand, ICanReply<InstanceActionResult<StopInstanceResult>>;
public sealed record SendMinecraftCommandCommand(Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>; public sealed record SendCommandToInstanceCommand(Guid AuditLogUserGuid, string Command) : ICommand, ICanReply<InstanceActionResult<SendCommandToInstanceResult>>;
private void SetStatus(SetStatusCommand command) { private void SetStatus(SetStatusCommand command) {
status = command.Status; status = command.Status;
@ -98,11 +98,11 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments); entity.JvmArguments = JvmArgumentsHelper.Join(configuration.JvmArguments);
var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid); var auditLogWriter = new AuditLogRepository(db).Writer(command.AuditLogUserGuid);
if (command.IsNewInstance) { if (command.IsCreatingInstance) {
auditLogWriter.InstanceEdited(configuration.InstanceGuid); auditLogWriter.InstanceCreated(configuration.InstanceGuid);
} }
else { else {
auditLogWriter.InstanceCreated(configuration.InstanceGuid); auditLogWriter.InstanceEdited(configuration.InstanceGuid);
} }
await db.Ctx.SaveChangesAsync(cancellationToken); await db.Ctx.SaveChangesAsync(cancellationToken);
@ -148,7 +148,7 @@ sealed class InstanceActor : ReceiveActor<InstanceActor.ICommand> {
} }
} }
private async Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendMinecraftCommandCommand command) { private async Task<InstanceActionResult<SendCommandToInstanceResult>> SendMinecraftCommand(SendCommandToInstanceCommand command) {
var message = new SendCommandToInstanceMessage(InstanceGuid, command.Command); var message = new SendCommandToInstanceMessage(InstanceGuid, command.Command);
var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(message); var result = await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(message);

View File

@ -30,7 +30,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
private static readonly ILogger Logger = PhantomLogger.Create<WebMessageListener>(); private static readonly ILogger Logger = PhantomLogger.Create<WebMessageListener>();
private static int listenerSequenceId = 0; private static int listenerSequenceId = 0;
private readonly ActorRef<ICommand> actor; private readonly ActorRef<ICommand> actor;
private readonly RpcConnectionToClient<IMessageToWebListener> connection; private readonly RpcConnectionToClient<IMessageToWebListener> connection;
private readonly AuthToken authToken; private readonly AuthToken authToken;
@ -81,30 +81,30 @@ public sealed class WebMessageListener : IMessageToControllerListener {
} }
private readonly WebMessageListener listener; private readonly WebMessageListener listener;
private Actor(WebMessageListener listener) { private Actor(WebMessageListener listener) {
this.listener = listener; this.listener = listener;
Receive<StartConnectionCommand>(StartConnection); Receive<StartConnectionCommand>(StartConnection);
Receive<StopConnectionCommand>(StopConnection); Receive<StopConnectionCommand>(StopConnection);
Receive<RefreshAgentsCommand>(RefreshAgents); Receive<RefreshAgentsCommand>(RefreshAgents);
Receive<RefreshInstancesCommand>(RefreshInstances); Receive<RefreshInstancesCommand>(RefreshInstances);
} }
private void StartConnection(StartConnectionCommand command) { private void StartConnection(StartConnectionCommand command) {
listener.controllerState.AgentsByGuidReceiver.Register(SelfTyped, static state => new RefreshAgentsCommand(state)); listener.controllerState.AgentsByGuidReceiver.Register(SelfTyped, static state => new RefreshAgentsCommand(state));
listener.controllerState.InstancesByGuidReceiver.Register(SelfTyped, static state => new RefreshInstancesCommand(state)); listener.controllerState.InstancesByGuidReceiver.Register(SelfTyped, static state => new RefreshInstancesCommand(state));
listener.instanceLogManager.LogsReceived += HandleInstanceLogsReceived; listener.instanceLogManager.LogsReceived += HandleInstanceLogsReceived;
} }
private void StopConnection(StopConnectionCommand command) { private void StopConnection(StopConnectionCommand command) {
listener.instanceLogManager.LogsReceived -= HandleInstanceLogsReceived; listener.instanceLogManager.LogsReceived -= HandleInstanceLogsReceived;
listener.controllerState.AgentsByGuidReceiver.Unregister(SelfTyped); listener.controllerState.AgentsByGuidReceiver.Unregister(SelfTyped);
listener.controllerState.InstancesByGuidReceiver.Unregister(SelfTyped); listener.controllerState.InstancesByGuidReceiver.Unregister(SelfTyped);
} }
private void RefreshAgents(RefreshAgentsCommand command) { private void RefreshAgents(RefreshAgentsCommand command) {
var message = new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray()); var message = new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray());
listener.connection.Send(message); listener.connection.Send(message);
@ -114,7 +114,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
var message = new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray()); var message = new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray());
listener.connection.Send(message); listener.connection.Send(message);
} }
private void HandleInstanceLogsReceived(object? sender, InstanceLogManager.Event e) { private void HandleInstanceLogsReceived(object? sender, InstanceLogManager.Event e) {
listener.connection.Send(new InstanceOutputMessage(e.InstanceGuid, e.Lines)); listener.connection.Send(new InstanceOutputMessage(e.InstanceGuid, e.Lines));
} }
@ -123,11 +123,11 @@ public sealed class WebMessageListener : IMessageToControllerListener {
private interface ICommand {} private interface ICommand {}
private sealed record StartConnectionCommand : ICommand; private sealed record StartConnectionCommand : ICommand;
private sealed record StopConnectionCommand : ICommand; private sealed record StopConnectionCommand : ICommand;
private sealed record RefreshAgentsCommand(ImmutableDictionary<Guid, Agent> Agents) : ICommand; private sealed record RefreshAgentsCommand(ImmutableDictionary<Guid, Agent> Agents) : ICommand;
private sealed record RefreshInstancesCommand(ImmutableDictionary<Guid, Instance> Instances) : ICommand; private sealed record RefreshInstancesCommand(ImmutableDictionary<Guid, Instance> Instances) : ICommand;
public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) { public async Task<NoReply> HandleRegisterWeb(RegisterWebMessage message) {
@ -199,7 +199,7 @@ public sealed class WebMessageListener : IMessageToControllerListener {
} }
public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) { public Task<InstanceActionResult<SendCommandToInstanceResult>> HandleSendCommandToInstance(SendCommandToInstanceMessage message) {
return agentManager.DoInstanceAction<AgentActor.SendMinecraftCommandCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendMinecraftCommandCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command)); return agentManager.DoInstanceAction<AgentActor.SendCommandToInstanceCommand, SendCommandToInstanceResult>(message.AgentGuid, new AgentActor.SendCommandToInstanceCommand(message.InstanceGuid, message.LoggedInUserGuid, message.Command));
} }
public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) { public Task<ImmutableArray<MinecraftVersion>> HandleGetMinecraftVersions(GetMinecraftVersionsMessage message) {

View File

@ -5,47 +5,42 @@ namespace Phantom.Utils.Actor;
public abstract class ReceiveActor<TMessage> : ReceiveActor { public abstract class ReceiveActor<TMessage> : ReceiveActor {
protected ActorRef<TMessage> SelfTyped => new (Self); protected ActorRef<TMessage> SelfTyped => new (Self);
protected void ReceiveAndReply<TReplyableCommand, TReply>(Func<TReplyableCommand, TReply> action) where TReplyableCommand : TMessage, ICanReply<TReply> { protected void ReceiveAndReply<TReplyableMessage, TReply>(Func<TReplyableMessage, TReply> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
Receive<TReplyableCommand>(message => HandleMessageWithReply(action, message)); Receive<TReplyableMessage>(message => HandleMessageWithReply(action, message));
} }
protected void ReceiveAsyncAndReply<TReplyableCommand, TReply>(Func<TReplyableCommand, Task<TReply>> action) where TReplyableCommand : TMessage, ICanReply<TReply> { protected void ReceiveAndReplyLater<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
ReceiveAsync<TReplyableCommand>(message => HandleMessageWithReplyAsync(action, message)); // Must be async to set default task scheduler to actor scheduler.
ReceiveAsync<TReplyableMessage>(message => HandleMessageWithReplyLater(action, message));
} }
protected void ReceiveAsyncAndReplyLater<TReplyableCommand, TReply>(Func<TReplyableCommand, TaskCompletionSource<TReply>, Task> action) where TReplyableCommand : TMessage, ICanReply<TReply> { protected void ReceiveAsyncAndReply<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action) where TReplyableMessage : TMessage, ICanReply<TReply> {
ReceiveAsync<TReplyableCommand>(message => HandleMessageWithReplyAsync(action, message)); ReceiveAsync<TReplyableMessage>(message => HandleMessageWithReplyAsync(action, message));
} }
private void HandleMessageWithReply<TReplyableCommand, TReply>(Func<TReplyableCommand, TReply> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> { private void HandleMessageWithReply<TReplyableMessage, TReply>(Func<TReplyableMessage, TReply> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
try { try {
Sender.Tell(action(message), Self); Sender.Tell(action(message), Self);
} catch (Exception e) { } catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self); Sender.Tell(new Status.Failure(e), Self);
} }
} }
private async Task HandleMessageWithReplyAsync<TReplyableCommand, TReply>(Func<TReplyableCommand, Task<TReply>> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> { private Task HandleMessageWithReplyLater<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
try { try {
Sender.Tell(await action(message), Self); action(message).PipeTo(Sender, Self);
} catch (Exception e) { } catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self); Sender.Tell(new Status.Failure(e), Self);
} }
}
private async Task HandleMessageWithReplyAsync<TReplyableCommand, TReply>(Func<TReplyableCommand, TaskCompletionSource<TReply>, Task> action, TReplyableCommand message) where TReplyableCommand : TMessage, ICanReply<TReply> {
var taskCompletionSource = new TaskCompletionSource<TReply>();
var sender = Sender;
taskCompletionSource.Task.ContinueWith(task => ((ICanTell) sender).Tell()) return Task.CompletedTask;
try {
Sender.Tell(await action(message), Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
} }
protected void CompleteWith<TReply>(TaskCompletionSource<TReply> taskCompletionSource, Func<Task<TReply>> task) { private async Task HandleMessageWithReplyAsync<TReplyableMessage, TReply>(Func<TReplyableMessage, Task<TReply>> action, TReplyableMessage message) where TReplyableMessage : TMessage, ICanReply<TReply> {
try {
Sender.Tell(await action(message), Self);
} catch (Exception e) {
Sender.Tell(new Status.Failure(e), Self);
}
} }
} }

View File

@ -0,0 +1,33 @@
using Akka.Dispatch;
namespace Phantom.Utils.Actor.Tasks;
public static class TaskExtensions {
public static Task<TResult> ContinueOnActor<TSource, TResult>(this Task<TSource> task, Func<TSource, TResult> mapper) {
if (TaskScheduler.Current is not ActorTaskScheduler actorTaskScheduler) {
throw new InvalidOperationException("Task must be scheduled in Actor context!");
}
var continuationCompletionSource = new TaskCompletionSource<TResult>();
var continuationTask = task.ContinueWith(t => MapResult(t, mapper, continuationCompletionSource), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, actorTaskScheduler);
return continuationTask.Unwrap();
}
public static Task<TResult> ContinueOnActor<TSource, TArg, TResult>(this Task<TSource> task, Func<TSource, TArg, TResult> mapper, TArg arg) {
return task.ContinueOnActor(result => mapper(result, arg));
}
private static Task<TResult> MapResult<TSource, TResult>(Task<TSource> task, Func<TSource, TResult> mapper, TaskCompletionSource<TResult> completionSource) {
if (task.IsFaulted) {
completionSource.SetException(task.Exception.InnerExceptions);
}
else if (task.IsCanceled) {
completionSource.SetCanceled();
}
else {
completionSource.SetResult(mapper(task.Result));
}
return completionSource.Task;
}
}

View File

@ -1,16 +1,24 @@
using NetMQ; using NetMQ;
using NetMQ.Sockets; using NetMQ.Sockets;
using Phantom.Utils.Rpc.Message; using Phantom.Utils.Rpc.Message;
using Phantom.Utils.Tasks;
namespace Phantom.Utils.Rpc.Runtime; namespace Phantom.Utils.Rpc.Runtime;
public sealed class RpcConnectionToServer<TListener> : RpcConnection<TListener> { public sealed class RpcConnectionToServer<TListener> : RpcConnection<TListener> {
private readonly ClientSocket socket; private readonly ClientSocket socket;
private readonly TaskCompletionSource isReady = AsyncTasks.CreateCompletionSource();
public Task IsReady => isReady.Task;
internal RpcConnectionToServer(string loggerName, ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) { internal RpcConnectionToServer(string loggerName, ClientSocket socket, MessageRegistry<TListener> messageRegistry, MessageReplyTracker replyTracker) : base(loggerName, messageRegistry, replyTracker) {
this.socket = socket; this.socket = socket;
} }
public void SetIsReady() {
isReady.TrySetResult();
}
private protected override ValueTask Send(byte[] bytes) { private protected override ValueTask Send(byte[] bytes) {
return socket.SendAsync(bytes); return socket.SendAsync(bytes);
} }