mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-09-17 12:24:49 +02:00
Compare commits
4 Commits
9a4ac4604e
...
cf609a02f8
Author | SHA1 | Date | |
---|---|---|---|
cf609a02f8
|
|||
23f9e079c1
|
|||
8f8e0cf2e0
|
|||
a65f22f97d
|
@@ -52,6 +52,8 @@ public sealed class AgentServices {
|
|||||||
public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) {
|
public async Task<bool> Register(ControllerConnection connection, CancellationToken cancellationToken) {
|
||||||
Logger.Information("Registering with the controller...");
|
Logger.Information("Registering with the controller...");
|
||||||
|
|
||||||
|
// TODO NEED TO SEND WHEN SERVER RESTARTS!!!
|
||||||
|
|
||||||
ImmutableArray<ConfigureInstanceMessage> configureInstanceMessages;
|
ImmutableArray<ConfigureInstanceMessage> configureInstanceMessages;
|
||||||
try {
|
try {
|
||||||
configureInstanceMessages = await connection.Send<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
|
configureInstanceMessages = await connection.Send<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(new RegisterAgentMessage(AgentInfo), TimeSpan.FromMinutes(1), cancellationToken);
|
||||||
|
@@ -21,6 +21,7 @@ using Phantom.Utils.Actor.Mailbox;
|
|||||||
using Phantom.Utils.Actor.Tasks;
|
using Phantom.Utils.Actor.Tasks;
|
||||||
using Phantom.Utils.Collections;
|
using Phantom.Utils.Collections;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Runtime.Server;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Agents;
|
namespace Phantom.Controller.Services.Agents;
|
||||||
@@ -167,13 +168,13 @@ sealed class AgentActor : ReceiveActor<AgentActor.ICommand> {
|
|||||||
return configurationMessages.ToImmutable();
|
return configurationMessages.ToImmutable();
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface ICommand {}
|
public interface ICommand;
|
||||||
|
|
||||||
private sealed record InitializeCommand : ICommand;
|
private sealed record InitializeCommand : ICommand;
|
||||||
|
|
||||||
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcConnectionToClient<IMessageToAgent> Connection) : ICommand, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
|
public sealed record RegisterCommand(AgentConfiguration Configuration, RpcServerToClientConnection<IMessageToController, IMessageToAgent> Connection) : ICommand, ICanReply<ImmutableArray<ConfigureInstanceMessage>>;
|
||||||
|
|
||||||
public sealed record UnregisterCommand(RpcConnectionToClient<IMessageToAgent> Connection) : ICommand;
|
public sealed record UnregisterCommand(RpcServerToClientConnection<IMessageToController, IMessageToAgent> Connection) : ICommand;
|
||||||
|
|
||||||
private sealed record RefreshConnectionStatusCommand : ICommand;
|
private sealed record RefreshConnectionStatusCommand : ICommand;
|
||||||
|
|
||||||
|
@@ -1,35 +1,29 @@
|
|||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Runtime.Server;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Agents;
|
namespace Phantom.Controller.Services.Agents;
|
||||||
|
|
||||||
sealed class AgentConnection {
|
sealed class AgentConnection(Guid agentGuid, string agentName) {
|
||||||
private static readonly ILogger Logger = PhantomLogger.Create<AgentConnection>();
|
private static readonly ILogger Logger = PhantomLogger.Create<AgentConnection>();
|
||||||
|
|
||||||
private readonly Guid agentGuid;
|
private string agentName = agentName;
|
||||||
private string agentName;
|
private RpcServerToClientConnection<IMessageToController, IMessageToAgent>? connection;
|
||||||
|
|
||||||
private RpcConnectionToClient<IMessageToAgent>? connection;
|
public void UpdateConnection(RpcServerToClientConnection<IMessageToController, IMessageToAgent> newConnection, string newAgentName) {
|
||||||
|
|
||||||
public AgentConnection(Guid agentGuid, string agentName) {
|
|
||||||
this.agentName = agentName;
|
|
||||||
this.agentGuid = agentGuid;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void UpdateConnection(RpcConnectionToClient<IMessageToAgent> newConnection, string newAgentName) {
|
|
||||||
lock (this) {
|
lock (this) {
|
||||||
connection?.Close();
|
connection?.CloseSession();
|
||||||
connection = newConnection;
|
connection = newConnection;
|
||||||
agentName = newAgentName;
|
agentName = newAgentName;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public bool CloseIfSame(RpcConnectionToClient<IMessageToAgent> expected) {
|
public bool CloseIfSame(RpcServerToClientConnection<IMessageToController, IMessageToAgent> expected) {
|
||||||
lock (this) {
|
lock (this) {
|
||||||
if (connection != null && connection.IsSame(expected)) {
|
if (connection != null && connection.IsSame(expected)) {
|
||||||
connection.Close();
|
connection.CloseSession();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -44,7 +38,7 @@ sealed class AgentConnection {
|
|||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
return connection.Send(message);
|
return connection.SendChannel.SendMessage(message).AsTask();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,10 +46,10 @@ sealed class AgentConnection {
|
|||||||
lock (this) {
|
lock (this) {
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
LogAgentOffline();
|
LogAgentOffline();
|
||||||
return Task.FromResult<TReply?>(default);
|
return Task.FromResult<TReply?>(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
return connection.Send<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken)!;
|
return connection.SendChannel.SendMessage<TMessage, TReply>(message, waitForReplyTime, waitForReplyCancellationToken)!;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -13,7 +13,7 @@ using Phantom.Controller.Minecraft;
|
|||||||
using Phantom.Controller.Services.Users.Sessions;
|
using Phantom.Controller.Services.Users.Sessions;
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc.Runtime.Server;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Agents;
|
namespace Phantom.Controller.Services.Agents;
|
||||||
@@ -22,7 +22,6 @@ sealed class AgentManager {
|
|||||||
private static readonly ILogger Logger = PhantomLogger.Create<AgentManager>();
|
private static readonly ILogger Logger = PhantomLogger.Create<AgentManager>();
|
||||||
|
|
||||||
private readonly IActorRefFactory actorSystem;
|
private readonly IActorRefFactory actorSystem;
|
||||||
private readonly AuthToken authToken;
|
|
||||||
private readonly ControllerState controllerState;
|
private readonly ControllerState controllerState;
|
||||||
private readonly MinecraftVersions minecraftVersions;
|
private readonly MinecraftVersions minecraftVersions;
|
||||||
private readonly UserLoginManager userLoginManager;
|
private readonly UserLoginManager userLoginManager;
|
||||||
@@ -32,9 +31,8 @@ sealed class AgentManager {
|
|||||||
private readonly ConcurrentDictionary<Guid, ActorRef<AgentActor.ICommand>> agentsByGuid = new ();
|
private readonly ConcurrentDictionary<Guid, ActorRef<AgentActor.ICommand>> agentsByGuid = new ();
|
||||||
private readonly Func<Guid, AgentConfiguration, ActorRef<AgentActor.ICommand>> addAgentActorFactory;
|
private readonly Func<Guid, AgentConfiguration, ActorRef<AgentActor.ICommand>> addAgentActorFactory;
|
||||||
|
|
||||||
public AgentManager(IActorRefFactory actorSystem, AuthToken authToken, ControllerState controllerState, MinecraftVersions minecraftVersions, UserLoginManager userLoginManager, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
|
public AgentManager(IActorRefFactory actorSystem, ControllerState controllerState, MinecraftVersions minecraftVersions, UserLoginManager userLoginManager, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
|
||||||
this.actorSystem = actorSystem;
|
this.actorSystem = actorSystem;
|
||||||
this.authToken = authToken;
|
|
||||||
this.controllerState = controllerState;
|
this.controllerState = controllerState;
|
||||||
this.minecraftVersions = minecraftVersions;
|
this.minecraftVersions = minecraftVersions;
|
||||||
this.userLoginManager = userLoginManager;
|
this.userLoginManager = userLoginManager;
|
||||||
@@ -63,18 +61,10 @@ sealed class AgentManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<bool> RegisterAgent(AuthToken authToken, AgentInfo agentInfo, RpcConnectionToClient<IMessageToAgent> connection) {
|
public async Task<ImmutableArray<ConfigureInstanceMessage>> RegisterAgent(AgentInfo agentInfo, RpcServerToClientConnection<IMessageToController, IMessageToAgent> connection) {
|
||||||
if (!this.authToken.FixedTimeEquals(authToken)) {
|
|
||||||
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentError.InvalidToken));
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
var agentProperties = AgentConfiguration.From(agentInfo);
|
var agentProperties = AgentConfiguration.From(agentInfo);
|
||||||
var agentActor = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
|
var agentActor = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
|
||||||
var configureInstanceMessages = await agentActor.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
|
return await agentActor.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
|
||||||
await connection.Send(new RegisterAgentSuccessMessage(configureInstanceMessages));
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) {
|
public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) {
|
||||||
|
@@ -1,8 +1,4 @@
|
|||||||
using Akka.Actor;
|
using Akka.Actor;
|
||||||
using Phantom.Common.Messages.Agent;
|
|
||||||
using Phantom.Common.Messages.Agent.ToController;
|
|
||||||
using Phantom.Common.Messages.Web;
|
|
||||||
using Phantom.Common.Messages.Web.ToController;
|
|
||||||
using Phantom.Controller.Database;
|
using Phantom.Controller.Database;
|
||||||
using Phantom.Controller.Minecraft;
|
using Phantom.Controller.Minecraft;
|
||||||
using Phantom.Controller.Services.Agents;
|
using Phantom.Controller.Services.Agents;
|
||||||
@@ -13,8 +9,8 @@ using Phantom.Controller.Services.Users;
|
|||||||
using Phantom.Controller.Services.Users.Sessions;
|
using Phantom.Controller.Services.Users.Sessions;
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
using Phantom.Utils.Rpc;
|
using Phantom.Utils.Rpc;
|
||||||
using IMessageFromAgentToController = Phantom.Common.Messages.Agent.IMessageToController;
|
using IRpcAgentRegistrar = Phantom.Utils.Rpc.Runtime.Server.IRpcServerClientRegistrar<Phantom.Common.Messages.Agent.IMessageToController, Phantom.Common.Messages.Agent.IMessageToAgent>;
|
||||||
using IMessageFromWebToController = Phantom.Common.Messages.Web.IMessageToController;
|
using IRpcWebRegistrar = Phantom.Utils.Rpc.Runtime.Server.IRpcServerClientRegistrar<Phantom.Common.Messages.Web.IMessageToController, Phantom.Common.Messages.Web.IMessageToWeb>;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services;
|
namespace Phantom.Controller.Services;
|
||||||
|
|
||||||
@@ -37,13 +33,13 @@ public sealed class ControllerServices : IDisposable {
|
|||||||
private AuditLogManager AuditLogManager { get; }
|
private AuditLogManager AuditLogManager { get; }
|
||||||
private EventLogManager EventLogManager { get; }
|
private EventLogManager EventLogManager { get; }
|
||||||
|
|
||||||
public IRegistrationHandler<IMessageToAgent, IMessageFromAgentToController, RegisterAgentMessage> AgentRegistrationHandler { get; }
|
public IRpcAgentRegistrar AgentRegistrar { get; }
|
||||||
public IRegistrationHandler<IMessageToWeb, IMessageFromWebToController, RegisterWebMessage> WebRegistrationHandler { get; }
|
public IRpcWebRegistrar WebRegistrar { get; }
|
||||||
|
|
||||||
private readonly IDbContextProvider dbProvider;
|
private readonly IDbContextProvider dbProvider;
|
||||||
private readonly CancellationToken cancellationToken;
|
private readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
public ControllerServices(IDbContextProvider dbProvider, AuthToken agentAuthToken, AuthToken webAuthToken, CancellationToken shutdownCancellationToken) {
|
public ControllerServices(IDbContextProvider dbProvider, AuthToken agentAuthToken, CancellationToken shutdownCancellationToken) {
|
||||||
this.dbProvider = dbProvider;
|
this.dbProvider = dbProvider;
|
||||||
this.cancellationToken = shutdownCancellationToken;
|
this.cancellationToken = shutdownCancellationToken;
|
||||||
|
|
||||||
@@ -59,14 +55,14 @@ public sealed class ControllerServices : IDisposable {
|
|||||||
this.UserLoginManager = new UserLoginManager(AuthenticatedUserCache, UserManager, dbProvider);
|
this.UserLoginManager = new UserLoginManager(AuthenticatedUserCache, UserManager, dbProvider);
|
||||||
this.PermissionManager = new PermissionManager(dbProvider);
|
this.PermissionManager = new PermissionManager(dbProvider);
|
||||||
|
|
||||||
this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, UserLoginManager, dbProvider, cancellationToken);
|
this.AgentManager = new AgentManager(ActorSystem, ControllerState, MinecraftVersions, UserLoginManager, dbProvider, cancellationToken);
|
||||||
this.InstanceLogManager = new InstanceLogManager();
|
this.InstanceLogManager = new InstanceLogManager();
|
||||||
|
|
||||||
this.AuditLogManager = new AuditLogManager(dbProvider);
|
this.AuditLogManager = new AuditLogManager(dbProvider);
|
||||||
this.EventLogManager = new EventLogManager(ControllerState, ActorSystem, dbProvider, shutdownCancellationToken);
|
this.EventLogManager = new EventLogManager(ControllerState, ActorSystem, dbProvider, shutdownCancellationToken);
|
||||||
|
|
||||||
this.AgentRegistrationHandler = new AgentRegistrationHandler(AgentManager, InstanceLogManager, EventLogManager);
|
this.AgentRegistrar = new AgentClientRegistrar(ActorSystem, AgentManager, InstanceLogManager, EventLogManager);
|
||||||
this.WebRegistrationHandler = new WebRegistrationHandler(webAuthToken, ControllerState, InstanceLogManager, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, MinecraftVersions, EventLogManager);
|
this.WebRegistrar = new WebClientRegistrar(ActorSystem, ControllerState, InstanceLogManager, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, MinecraftVersions, EventLogManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Initialize() {
|
public async Task Initialize() {
|
||||||
|
@@ -0,0 +1,21 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Phantom.Common.Messages.Agent;
|
||||||
|
using Phantom.Controller.Services.Agents;
|
||||||
|
using Phantom.Controller.Services.Events;
|
||||||
|
using Phantom.Controller.Services.Instances;
|
||||||
|
using Phantom.Utils.Actor;
|
||||||
|
using Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
|
namespace Phantom.Controller.Services.Rpc;
|
||||||
|
|
||||||
|
sealed class AgentClientRegistrar(
|
||||||
|
IActorRefFactory actorSystem,
|
||||||
|
AgentManager agentManager,
|
||||||
|
InstanceLogManager instanceLogManager,
|
||||||
|
EventLogManager eventLogManager
|
||||||
|
) : IRpcServerClientRegistrar<IMessageToController, IMessageToAgent> {
|
||||||
|
public ActorRef<IMessageToController> Register(Guid sessionId, RpcServerToClientConnection<IMessageToController, IMessageToAgent> connection) {
|
||||||
|
var init = new AgentMessageHandlerActor.Init(connection, agentManager, instanceLogManager, eventLogManager);
|
||||||
|
return actorSystem.ActorOf(AgentMessageHandlerActor.Factory(init), "AgentClient-" + sessionId);
|
||||||
|
}
|
||||||
|
}
|
@@ -1,4 +1,4 @@
|
|||||||
using Phantom.Common.Data.Replies;
|
using System.Collections.Immutable;
|
||||||
using Phantom.Common.Messages.Agent;
|
using Phantom.Common.Messages.Agent;
|
||||||
using Phantom.Common.Messages.Agent.ToAgent;
|
using Phantom.Common.Messages.Agent.ToAgent;
|
||||||
using Phantom.Common.Messages.Agent.ToController;
|
using Phantom.Common.Messages.Agent.ToController;
|
||||||
@@ -6,32 +6,31 @@ using Phantom.Controller.Services.Agents;
|
|||||||
using Phantom.Controller.Services.Events;
|
using Phantom.Controller.Services.Events;
|
||||||
using Phantom.Controller.Services.Instances;
|
using Phantom.Controller.Services.Instances;
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
|
using Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Rpc;
|
namespace Phantom.Controller.Services.Rpc;
|
||||||
|
|
||||||
sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
|
sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
|
||||||
public readonly record struct Init(Guid AgentGuid, RpcConnectionToClient<IMessageToAgent> Connection, AgentRegistrationHandler AgentRegistrationHandler, AgentManager AgentManager, InstanceLogManager InstanceLogManager, EventLogManager EventLogManager);
|
public readonly record struct Init(RpcServerToClientConnection<IMessageToController, IMessageToAgent> Connection, AgentManager AgentManager, InstanceLogManager InstanceLogManager, EventLogManager EventLogManager);
|
||||||
|
|
||||||
public static Props<IMessageToController> Factory(Init init) {
|
public static Props<IMessageToController> Factory(Init init) {
|
||||||
return Props<IMessageToController>.Create(() => new AgentMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
return Props<IMessageToController>.Create(() => new AgentMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
||||||
}
|
}
|
||||||
|
|
||||||
private readonly Guid agentGuid;
|
private readonly RpcServerToClientConnection<IMessageToController, IMessageToAgent> connection;
|
||||||
private readonly RpcConnectionToClient<IMessageToAgent> connection;
|
|
||||||
private readonly AgentRegistrationHandler agentRegistrationHandler;
|
|
||||||
private readonly AgentManager agentManager;
|
private readonly AgentManager agentManager;
|
||||||
private readonly InstanceLogManager instanceLogManager;
|
private readonly InstanceLogManager instanceLogManager;
|
||||||
private readonly EventLogManager eventLogManager;
|
private readonly EventLogManager eventLogManager;
|
||||||
|
|
||||||
|
private Guid? agentGuid;
|
||||||
|
|
||||||
private AgentMessageHandlerActor(Init init) {
|
private AgentMessageHandlerActor(Init init) {
|
||||||
this.agentGuid = init.AgentGuid;
|
|
||||||
this.connection = init.Connection;
|
this.connection = init.Connection;
|
||||||
this.agentRegistrationHandler = init.AgentRegistrationHandler;
|
|
||||||
this.agentManager = init.AgentManager;
|
this.agentManager = init.AgentManager;
|
||||||
this.instanceLogManager = init.InstanceLogManager;
|
this.instanceLogManager = init.InstanceLogManager;
|
||||||
this.eventLogManager = init.EventLogManager;
|
this.eventLogManager = init.EventLogManager;
|
||||||
|
|
||||||
ReceiveAsync<RegisterAgentMessage>(HandleRegisterAgent);
|
ReceiveAsyncAndReply<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(HandleRegisterAgent);
|
||||||
Receive<UnregisterAgentMessage>(HandleUnregisterAgent);
|
Receive<UnregisterAgentMessage>(HandleUnregisterAgent);
|
||||||
Receive<AgentIsAliveMessage>(HandleAgentIsAlive);
|
Receive<AgentIsAliveMessage>(HandleAgentIsAlive);
|
||||||
Receive<AdvertiseJavaRuntimesMessage>(HandleAdvertiseJavaRuntimes);
|
Receive<AdvertiseJavaRuntimesMessage>(HandleAdvertiseJavaRuntimes);
|
||||||
@@ -42,42 +41,42 @@ sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
|
|||||||
Receive<InstanceOutputMessage>(HandleInstanceOutput);
|
Receive<InstanceOutputMessage>(HandleInstanceOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task HandleRegisterAgent(RegisterAgentMessage message) {
|
private Guid RequireAgentGuid() {
|
||||||
if (agentGuid != message.AgentInfo.AgentGuid) {
|
return agentGuid ?? throw new InvalidOperationException("Agent has not registered yet.");
|
||||||
await connection.Send(new RegisterAgentFailureMessage(RegisterAgentError.ConnectionAlreadyHasAnAgent));
|
}
|
||||||
}
|
|
||||||
else {
|
private Task<ImmutableArray<ConfigureInstanceMessage>> HandleRegisterAgent(RegisterAgentMessage message) {
|
||||||
await agentRegistrationHandler.TryRegisterImpl(connection, message);
|
agentGuid = message.AgentInfo.AgentGuid;
|
||||||
}
|
return agentManager.RegisterAgent(message.AgentInfo, connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleUnregisterAgent(UnregisterAgentMessage message) {
|
private void HandleUnregisterAgent(UnregisterAgentMessage message) {
|
||||||
agentManager.TellAgent(agentGuid, new AgentActor.UnregisterCommand(connection));
|
agentManager.TellAgent(RequireAgentGuid(), new AgentActor.UnregisterCommand(connection));
|
||||||
connection.Close();
|
connection.CloseSession();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleAgentIsAlive(AgentIsAliveMessage message) {
|
private void HandleAgentIsAlive(AgentIsAliveMessage message) {
|
||||||
agentManager.TellAgent(agentGuid, new AgentActor.NotifyIsAliveCommand());
|
agentManager.TellAgent(RequireAgentGuid(), new AgentActor.NotifyIsAliveCommand());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
|
private void HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
|
||||||
agentManager.TellAgent(agentGuid, new AgentActor.UpdateJavaRuntimesCommand(message.Runtimes));
|
agentManager.TellAgent(RequireAgentGuid(), new AgentActor.UpdateJavaRuntimesCommand(message.Runtimes));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleReportAgentStatus(ReportAgentStatusMessage message) {
|
private void HandleReportAgentStatus(ReportAgentStatusMessage message) {
|
||||||
agentManager.TellAgent(agentGuid, new AgentActor.UpdateStatsCommand(message.RunningInstanceCount, message.RunningInstanceMemory));
|
agentManager.TellAgent(RequireAgentGuid(), new AgentActor.UpdateStatsCommand(message.RunningInstanceCount, message.RunningInstanceMemory));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
|
private void HandleReportInstanceStatus(ReportInstanceStatusMessage message) {
|
||||||
agentManager.TellAgent(agentGuid, new AgentActor.UpdateInstanceStatusCommand(message.InstanceGuid, message.InstanceStatus));
|
agentManager.TellAgent(RequireAgentGuid(), new AgentActor.UpdateInstanceStatusCommand(message.InstanceGuid, message.InstanceStatus));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleReportInstancePlayerCounts(ReportInstancePlayerCountsMessage message) {
|
private void HandleReportInstancePlayerCounts(ReportInstancePlayerCountsMessage message) {
|
||||||
agentManager.TellAgent(agentGuid, new AgentActor.UpdateInstancePlayerCountsCommand(message.InstanceGuid, message.PlayerCounts));
|
agentManager.TellAgent(RequireAgentGuid(), new AgentActor.UpdateInstancePlayerCountsCommand(message.InstanceGuid, message.PlayerCounts));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleReportInstanceEvent(ReportInstanceEventMessage message) {
|
private void HandleReportInstanceEvent(ReportInstanceEventMessage message) {
|
||||||
message.Event.Accept(eventLogManager.CreateInstanceEventVisitor(message.EventGuid, message.UtcTime, agentGuid, message.InstanceGuid));
|
message.Event.Accept(eventLogManager.CreateInstanceEventVisitor(message.EventGuid, message.UtcTime, RequireAgentGuid(), message.InstanceGuid));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleInstanceOutput(InstanceOutputMessage message) {
|
private void HandleInstanceOutput(InstanceOutputMessage message) {
|
||||||
|
@@ -1,34 +0,0 @@
|
|||||||
using Phantom.Common.Messages.Agent;
|
|
||||||
using Phantom.Common.Messages.Agent.ToController;
|
|
||||||
using Phantom.Controller.Services.Agents;
|
|
||||||
using Phantom.Controller.Services.Events;
|
|
||||||
using Phantom.Controller.Services.Instances;
|
|
||||||
using Phantom.Utils.Actor;
|
|
||||||
using Phantom.Utils.Rpc.Runtime2;
|
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Rpc;
|
|
||||||
|
|
||||||
sealed class AgentRegistrationHandler : IRegistrationHandler<IMessageToAgent, IMessageToController, RegisterAgentMessage> {
|
|
||||||
private readonly AgentManager agentManager;
|
|
||||||
private readonly InstanceLogManager instanceLogManager;
|
|
||||||
private readonly EventLogManager eventLogManager;
|
|
||||||
|
|
||||||
public AgentRegistrationHandler(AgentManager agentManager, InstanceLogManager instanceLogManager, EventLogManager eventLogManager) {
|
|
||||||
this.agentManager = agentManager;
|
|
||||||
this.instanceLogManager = instanceLogManager;
|
|
||||||
this.eventLogManager = eventLogManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
async Task<Props<IMessageToController>?> IRegistrationHandler<IMessageToAgent, IMessageToController, RegisterAgentMessage>.TryRegister(RpcConnectionToClient<IMessageToAgent> connection, RegisterAgentMessage message) {
|
|
||||||
return await TryRegisterImpl(connection, message) ? CreateMessageHandlerActorProps(message.AgentInfo.AgentGuid, connection) : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task<bool> TryRegisterImpl(RpcConnectionToClient<IMessageToAgent> connection, RegisterAgentMessage message) {
|
|
||||||
return agentManager.RegisterAgent(message.AuthToken, message.AgentInfo, connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Props<IMessageToController> CreateMessageHandlerActorProps(Guid agentGuid, RpcConnectionToClient<IMessageToAgent> connection) {
|
|
||||||
var init = new AgentMessageHandlerActor.Init(agentGuid, connection, this, agentManager, instanceLogManager, eventLogManager);
|
|
||||||
return AgentMessageHandlerActor.Factory(init);
|
|
||||||
}
|
|
||||||
}
|
|
@@ -0,0 +1,31 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Phantom.Common.Messages.Web;
|
||||||
|
using Phantom.Controller.Minecraft;
|
||||||
|
using Phantom.Controller.Services.Agents;
|
||||||
|
using Phantom.Controller.Services.Events;
|
||||||
|
using Phantom.Controller.Services.Instances;
|
||||||
|
using Phantom.Controller.Services.Users;
|
||||||
|
using Phantom.Controller.Services.Users.Sessions;
|
||||||
|
using Phantom.Utils.Actor;
|
||||||
|
using Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
|
namespace Phantom.Controller.Services.Rpc;
|
||||||
|
|
||||||
|
sealed class WebClientRegistrar(
|
||||||
|
IActorRefFactory actorSystem,
|
||||||
|
ControllerState controllerState,
|
||||||
|
InstanceLogManager instanceLogManager,
|
||||||
|
UserManager userManager,
|
||||||
|
RoleManager roleManager,
|
||||||
|
UserRoleManager userRoleManager,
|
||||||
|
UserLoginManager userLoginManager,
|
||||||
|
AuditLogManager auditLogManager,
|
||||||
|
AgentManager agentManager,
|
||||||
|
MinecraftVersions minecraftVersions,
|
||||||
|
EventLogManager eventLogManager
|
||||||
|
) : IRpcServerClientRegistrar<IMessageToController, IMessageToWeb> {
|
||||||
|
public ActorRef<IMessageToController> Register(Guid sessionId, RpcServerToClientConnection<IMessageToController, IMessageToWeb> connection) {
|
||||||
|
var init = new WebMessageHandlerActor.Init(connection, this, controllerState, instanceLogManager, userManager, roleManager, userRoleManager, userLoginManager, auditLogManager, agentManager, minecraftVersions, eventLogManager);
|
||||||
|
return actorSystem.ActorOf(WebMessageHandlerActor.Factory(init), "WebClient-" + sessionId);
|
||||||
|
}
|
||||||
|
}
|
@@ -5,17 +5,18 @@ using Phantom.Common.Messages.Web;
|
|||||||
using Phantom.Common.Messages.Web.ToWeb;
|
using Phantom.Common.Messages.Web.ToWeb;
|
||||||
using Phantom.Controller.Services.Instances;
|
using Phantom.Controller.Services.Instances;
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Rpc;
|
namespace Phantom.Controller.Services.Rpc;
|
||||||
|
|
||||||
sealed class WebMessageDataUpdateSenderActor : ReceiveActor<WebMessageDataUpdateSenderActor.ICommand> {
|
sealed class WebMessageDataUpdateSenderActor : ReceiveActor<WebMessageDataUpdateSenderActor.ICommand> {
|
||||||
public readonly record struct Init(RpcConnectionToClient<IMessageToWeb> Connection, ControllerState ControllerState, InstanceLogManager InstanceLogManager);
|
public readonly record struct Init(RpcSendChannel<IMessageToWeb> Connection, ControllerState ControllerState, InstanceLogManager InstanceLogManager);
|
||||||
|
|
||||||
public static Props<ICommand> Factory(Init init) {
|
public static Props<ICommand> Factory(Init init) {
|
||||||
return Props<ICommand>.Create(() => new WebMessageDataUpdateSenderActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
return Props<ICommand>.Create(() => new WebMessageDataUpdateSenderActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
||||||
}
|
}
|
||||||
|
|
||||||
private readonly RpcConnectionToClient<IMessageToWeb> connection;
|
private readonly RpcSendChannel<IMessageToWeb> connection;
|
||||||
private readonly ControllerState controllerState;
|
private readonly ControllerState controllerState;
|
||||||
private readonly InstanceLogManager instanceLogManager;
|
private readonly InstanceLogManager instanceLogManager;
|
||||||
private readonly ActorRef<ICommand> selfCached;
|
private readonly ActorRef<ICommand> selfCached;
|
||||||
@@ -69,18 +70,18 @@ sealed class WebMessageDataUpdateSenderActor : ReceiveActor<WebMessageDataUpdate
|
|||||||
private sealed record RefreshUserSessionCommand(Guid UserGuid) : ICommand;
|
private sealed record RefreshUserSessionCommand(Guid UserGuid) : ICommand;
|
||||||
|
|
||||||
private Task RefreshAgents(RefreshAgentsCommand command) {
|
private Task RefreshAgents(RefreshAgentsCommand command) {
|
||||||
return connection.Send(new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray()));
|
return connection.SendMessage(new RefreshAgentsMessage(command.Agents.Values.ToImmutableArray())).AsTask();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Task RefreshInstances(RefreshInstancesCommand command) {
|
private Task RefreshInstances(RefreshInstancesCommand command) {
|
||||||
return connection.Send(new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray()));
|
return connection.SendMessage(new RefreshInstancesMessage(command.Instances.Values.ToImmutableArray())).AsTask();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Task ReceiveInstanceLogs(ReceiveInstanceLogsCommand command) {
|
private Task ReceiveInstanceLogs(ReceiveInstanceLogsCommand command) {
|
||||||
return connection.Send(new InstanceOutputMessage(command.InstanceGuid, command.Lines));
|
return connection.SendMessage(new InstanceOutputMessage(command.InstanceGuid, command.Lines)).AsTask();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Task RefreshUserSession(RefreshUserSessionCommand command) {
|
private Task RefreshUserSession(RefreshUserSessionCommand command) {
|
||||||
return connection.Send(new RefreshUserSessionMessage(command.UserGuid));
|
return connection.SendMessage(new RefreshUserSessionMessage(command.UserGuid)).AsTask();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -16,13 +16,14 @@ using Phantom.Controller.Services.Instances;
|
|||||||
using Phantom.Controller.Services.Users;
|
using Phantom.Controller.Services.Users;
|
||||||
using Phantom.Controller.Services.Users.Sessions;
|
using Phantom.Controller.Services.Users.Sessions;
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
|
using Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Rpc;
|
namespace Phantom.Controller.Services.Rpc;
|
||||||
|
|
||||||
sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
||||||
public readonly record struct Init(
|
public readonly record struct Init(
|
||||||
RpcConnectionToClient<IMessageToWeb> Connection,
|
RpcServerToClientConnection<IMessageToController, IMessageToWeb> Connection,
|
||||||
WebRegistrationHandler WebRegistrationHandler,
|
WebClientRegistrar WebClientRegistrar,
|
||||||
ControllerState ControllerState,
|
ControllerState ControllerState,
|
||||||
InstanceLogManager InstanceLogManager,
|
InstanceLogManager InstanceLogManager,
|
||||||
UserManager UserManager,
|
UserManager UserManager,
|
||||||
@@ -39,8 +40,7 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
|||||||
return Props<IMessageToController>.Create(() => new WebMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
return Props<IMessageToController>.Create(() => new WebMessageHandlerActor(init), new ActorConfiguration { SupervisorStrategy = SupervisorStrategies.Resume });
|
||||||
}
|
}
|
||||||
|
|
||||||
private readonly RpcConnectionToClient<IMessageToWeb> connection;
|
private readonly RpcServerToClientConnection<IMessageToController, IMessageToWeb> connection;
|
||||||
private readonly WebRegistrationHandler webRegistrationHandler;
|
|
||||||
private readonly ControllerState controllerState;
|
private readonly ControllerState controllerState;
|
||||||
private readonly UserManager userManager;
|
private readonly UserManager userManager;
|
||||||
private readonly RoleManager roleManager;
|
private readonly RoleManager roleManager;
|
||||||
@@ -53,7 +53,6 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
|||||||
|
|
||||||
private WebMessageHandlerActor(Init init) {
|
private WebMessageHandlerActor(Init init) {
|
||||||
this.connection = init.Connection;
|
this.connection = init.Connection;
|
||||||
this.webRegistrationHandler = init.WebRegistrationHandler;
|
|
||||||
this.controllerState = init.ControllerState;
|
this.controllerState = init.ControllerState;
|
||||||
this.userManager = init.UserManager;
|
this.userManager = init.UserManager;
|
||||||
this.roleManager = init.RoleManager;
|
this.roleManager = init.RoleManager;
|
||||||
@@ -64,11 +63,10 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
|||||||
this.minecraftVersions = init.MinecraftVersions;
|
this.minecraftVersions = init.MinecraftVersions;
|
||||||
this.eventLogManager = init.EventLogManager;
|
this.eventLogManager = init.EventLogManager;
|
||||||
|
|
||||||
var senderActorInit = new WebMessageDataUpdateSenderActor.Init(connection, controllerState, init.InstanceLogManager);
|
var senderActorInit = new WebMessageDataUpdateSenderActor.Init(connection.SendChannel, controllerState, init.InstanceLogManager);
|
||||||
Context.ActorOf(WebMessageDataUpdateSenderActor.Factory(senderActorInit), "DataUpdateSender");
|
Context.ActorOf(WebMessageDataUpdateSenderActor.Factory(senderActorInit), "DataUpdateSender");
|
||||||
|
|
||||||
ReceiveAsync<RegisterWebMessage>(HandleRegisterWeb);
|
ReceiveAsync<UnregisterWebMessage>(HandleUnregisterWeb);
|
||||||
Receive<UnregisterWebMessage>(HandleUnregisterWeb);
|
|
||||||
ReceiveAndReplyLater<LogInMessage, Optional<LogInSuccess>>(HandleLogIn);
|
ReceiveAndReplyLater<LogInMessage, Optional<LogInSuccess>>(HandleLogIn);
|
||||||
Receive<LogOutMessage>(HandleLogOut);
|
Receive<LogOutMessage>(HandleLogOut);
|
||||||
ReceiveAndReply<GetAuthenticatedUser, Optional<AuthenticatedUserInfo>>(GetAuthenticatedUser);
|
ReceiveAndReply<GetAuthenticatedUser, Optional<AuthenticatedUserInfo>>(GetAuthenticatedUser);
|
||||||
@@ -89,12 +87,8 @@ sealed class WebMessageHandlerActor : ReceiveActor<IMessageToController> {
|
|||||||
ReceiveAndReplyLater<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(HandleGetEventLog);
|
ReceiveAndReplyLater<GetEventLogMessage, Result<ImmutableArray<EventLogItem>, UserActionFailure>>(HandleGetEventLog);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task HandleRegisterWeb(RegisterWebMessage message) {
|
private Task HandleUnregisterWeb(UnregisterWebMessage message) {
|
||||||
await webRegistrationHandler.TryRegisterImpl(connection, message);
|
return connection.CloseSession();
|
||||||
}
|
|
||||||
|
|
||||||
private void HandleUnregisterWeb(UnregisterWebMessage message) {
|
|
||||||
connection.Close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Task<Optional<LogInSuccess>> HandleLogIn(LogInMessage message) {
|
private Task<Optional<LogInSuccess>> HandleLogIn(LogInMessage message) {
|
||||||
|
@@ -1,67 +0,0 @@
|
|||||||
using Phantom.Common.Messages.Web;
|
|
||||||
using Phantom.Common.Messages.Web.ToController;
|
|
||||||
using Phantom.Common.Messages.Web.ToWeb;
|
|
||||||
using Phantom.Controller.Minecraft;
|
|
||||||
using Phantom.Controller.Services.Agents;
|
|
||||||
using Phantom.Controller.Services.Events;
|
|
||||||
using Phantom.Controller.Services.Instances;
|
|
||||||
using Phantom.Controller.Services.Users;
|
|
||||||
using Phantom.Controller.Services.Users.Sessions;
|
|
||||||
using Phantom.Utils.Actor;
|
|
||||||
using Phantom.Utils.Logging;
|
|
||||||
using Phantom.Utils.Rpc;
|
|
||||||
using Serilog;
|
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Rpc;
|
|
||||||
|
|
||||||
sealed class WebRegistrationHandler : IRegistrationHandler<IMessageToWeb, IMessageToController, RegisterWebMessage> {
|
|
||||||
private static readonly ILogger Logger = PhantomLogger.Create<WebRegistrationHandler>();
|
|
||||||
|
|
||||||
private readonly AuthToken webAuthToken;
|
|
||||||
private readonly ControllerState controllerState;
|
|
||||||
private readonly InstanceLogManager instanceLogManager;
|
|
||||||
private readonly UserManager userManager;
|
|
||||||
private readonly RoleManager roleManager;
|
|
||||||
private readonly UserRoleManager userRoleManager;
|
|
||||||
private readonly UserLoginManager userLoginManager;
|
|
||||||
private readonly AuditLogManager auditLogManager;
|
|
||||||
private readonly AgentManager agentManager;
|
|
||||||
private readonly MinecraftVersions minecraftVersions;
|
|
||||||
private readonly EventLogManager eventLogManager;
|
|
||||||
|
|
||||||
public WebRegistrationHandler(AuthToken webAuthToken, ControllerState controllerState, InstanceLogManager instanceLogManager, UserManager userManager, RoleManager roleManager, UserRoleManager userRoleManager, UserLoginManager userLoginManager, AuditLogManager auditLogManager, AgentManager agentManager, MinecraftVersions minecraftVersions, EventLogManager eventLogManager) {
|
|
||||||
this.webAuthToken = webAuthToken;
|
|
||||||
this.controllerState = controllerState;
|
|
||||||
this.userManager = userManager;
|
|
||||||
this.roleManager = roleManager;
|
|
||||||
this.userRoleManager = userRoleManager;
|
|
||||||
this.userLoginManager = userLoginManager;
|
|
||||||
this.auditLogManager = auditLogManager;
|
|
||||||
this.agentManager = agentManager;
|
|
||||||
this.minecraftVersions = minecraftVersions;
|
|
||||||
this.eventLogManager = eventLogManager;
|
|
||||||
this.instanceLogManager = instanceLogManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
async Task<Props<IMessageToController>?> IRegistrationHandler<IMessageToWeb, IMessageToController, RegisterWebMessage>.TryRegister(RpcConnectionToClient<IMessageToWeb> connection, RegisterWebMessage message) {
|
|
||||||
return await TryRegisterImpl(connection, message) ? CreateMessageHandlerActorProps(connection) : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<bool> TryRegisterImpl(RpcConnectionToClient<IMessageToWeb> connection, RegisterWebMessage message) {
|
|
||||||
if (webAuthToken.FixedTimeEquals(message.AuthToken)) {
|
|
||||||
Logger.Information("Web authorized successfully.");
|
|
||||||
await connection.Send(new RegisterWebResultMessage(true));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
Logger.Warning("Web failed to authorize, invalid token.");
|
|
||||||
await connection.Send(new RegisterWebResultMessage(false));
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Props<IMessageToController> CreateMessageHandlerActorProps(RpcConnectionToClient<IMessageToWeb> connection) {
|
|
||||||
var init = new WebMessageHandlerActor.Init(connection, this, controllerState, instanceLogManager, userManager, roleManager, userRoleManager, userLoginManager, auditLogManager, agentManager, minecraftVersions, eventLogManager);
|
|
||||||
return WebMessageHandlerActor.Factory(init);
|
|
||||||
}
|
|
||||||
}
|
|
@@ -1,4 +1,6 @@
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
|
using Phantom.Common.Messages.Agent;
|
||||||
|
using Phantom.Common.Messages.Web;
|
||||||
using Phantom.Controller;
|
using Phantom.Controller;
|
||||||
using Phantom.Controller.Database.Postgres;
|
using Phantom.Controller.Database.Postgres;
|
||||||
using Phantom.Controller.Services;
|
using Phantom.Controller.Services;
|
||||||
@@ -7,6 +9,8 @@ using Phantom.Utils.Logging;
|
|||||||
using Phantom.Utils.Rpc.Runtime.Server;
|
using Phantom.Utils.Rpc.Runtime.Server;
|
||||||
using Phantom.Utils.Runtime;
|
using Phantom.Utils.Runtime;
|
||||||
using Phantom.Utils.Tasks;
|
using Phantom.Utils.Tasks;
|
||||||
|
using RpcAgentServer = Phantom.Utils.Rpc.Runtime.Server.RpcServer<Phantom.Common.Messages.Agent.IMessageToController, Phantom.Common.Messages.Agent.IMessageToAgent>;
|
||||||
|
using RpcWebServer = Phantom.Utils.Rpc.Runtime.Server.RpcServer<Phantom.Common.Messages.Web.IMessageToController, Phantom.Common.Messages.Web.IMessageToWeb>;
|
||||||
|
|
||||||
var shutdownCancellationTokenSource = new CancellationTokenSource();
|
var shutdownCancellationTokenSource = new CancellationTokenSource();
|
||||||
var shutdownCancellationToken = shutdownCancellationTokenSource.Token;
|
var shutdownCancellationToken = shutdownCancellationTokenSource.Token;
|
||||||
@@ -53,12 +57,28 @@ try {
|
|||||||
|
|
||||||
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
|
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
|
||||||
|
|
||||||
using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
|
using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, shutdownCancellationToken);
|
||||||
await controllerServices.Initialize();
|
await controllerServices.Initialize();
|
||||||
|
|
||||||
|
var agentConnectionParameters = new RpcServerConnectionParameters(
|
||||||
|
EndPoint: agentRpcServerHost,
|
||||||
|
Certificate: agentKeyData.Certificate,
|
||||||
|
AuthToken: agentKeyData.AuthToken,
|
||||||
|
SendQueueCapacity: 100,
|
||||||
|
PingInterval: TimeSpan.FromSeconds(10)
|
||||||
|
);
|
||||||
|
|
||||||
|
var webConnectionParameters = new RpcServerConnectionParameters(
|
||||||
|
EndPoint: webRpcServerHost,
|
||||||
|
Certificate: webKeyData.Certificate,
|
||||||
|
AuthToken: webKeyData.AuthToken,
|
||||||
|
SendQueueCapacity: 500,
|
||||||
|
PingInterval: TimeSpan.FromMinutes(1)
|
||||||
|
);
|
||||||
|
|
||||||
LinkedTasks<bool> rpcServerTasks = new LinkedTasks<bool>([
|
LinkedTasks<bool> rpcServerTasks = new LinkedTasks<bool>([
|
||||||
new RpcServer("Agent", agentRpcServerHost, agentKeyData.AuthToken, agentKeyData.Certificate).Run(shutdownCancellationToken),
|
new RpcAgentServer("Agent", agentConnectionParameters, AgentMessageRegistries.Definitions, controllerServices.AgentRegistrar).Run(shutdownCancellationToken),
|
||||||
new RpcServer("Web", webRpcServerHost, agentKeyData.AuthToken, webKeyData.Certificate).Run(shutdownCancellationToken),
|
new RpcWebServer("Web", webConnectionParameters, WebMessageRegistries.Definitions, controllerServices.WebRegistrar).Run(shutdownCancellationToken),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
// If either RPC server crashes, stop the whole process.
|
// If either RPC server crashes, stop the whole process.
|
||||||
@@ -71,11 +91,6 @@ try {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
// await Task.WhenAll(
|
|
||||||
// RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.AgentRegistrationHandler, controllerServices.ActorSystem, shutdownCancellationToken),
|
|
||||||
// RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.WebRegistrationHandler, controllerServices.ActorSystem, shutdownCancellationToken)
|
|
||||||
// );
|
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
return 0;
|
return 0;
|
||||||
} catch (StopProcedureException) {
|
} catch (StopProcedureException) {
|
||||||
|
@@ -39,7 +39,7 @@ public static class PhantomLogger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T>(string name1, string name2) {
|
public static ILogger Create<T>(string name1, string name2) {
|
||||||
return Create(ConcatNames(typeof(T).Name, ConcatNames(name1, name2)));
|
return Create(ConcatNames(typeof(T).Name, name1, name2));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T1, T2>() {
|
public static ILogger Create<T1, T2>() {
|
||||||
@@ -47,13 +47,21 @@ public static class PhantomLogger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T1, T2>(string name) {
|
public static ILogger Create<T1, T2>(string name) {
|
||||||
return Create(ConcatNames(typeof(T1).Name, ConcatNames(typeof(T2).Name, name)));
|
return Create(ConcatNames(typeof(T1).Name, typeof(T2).Name, name));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static string ConcatNames(string name1, string name2) {
|
public static ILogger Create<T1, T2>(string name1, string name2) {
|
||||||
|
return Create(ConcatNames(typeof(T1).Name, typeof(T2).Name, ConcatNames(name1, name2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static string ConcatNames(string name1, string name2) {
|
||||||
return name1 + ":" + name2;
|
return name1 + ":" + name2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static string ConcatNames(string name1, string name2, string name3) {
|
||||||
|
return ConcatNames(name1, ConcatNames(name2, name3));
|
||||||
|
}
|
||||||
|
|
||||||
public static void Dispose() {
|
public static void Dispose() {
|
||||||
Root.Dispose();
|
Root.Dispose();
|
||||||
Base.Dispose();
|
Base.Dispose();
|
||||||
|
@@ -25,7 +25,7 @@ interface IFrame {
|
|||||||
|
|
||||||
case TypeMessageId:
|
case TypeMessageId:
|
||||||
var messageFrame = await MessageFrame.Read(stream, cancellationToken);
|
var messageFrame = await MessageFrame.Read(stream, cancellationToken);
|
||||||
await reader.OnMessageFrame(messageFrame, stream, cancellationToken);
|
await reader.OnMessageFrame(messageFrame, cancellationToken);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TypeReplyId:
|
case TypeReplyId:
|
||||||
@@ -45,7 +45,7 @@ interface IFrame {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ReadOnlyMemory<byte> Type { get; }
|
ReadOnlyMemory<byte> FrameType { get; }
|
||||||
|
|
||||||
Task Write(Stream stream, CancellationToken cancellationToken = default);
|
Task Write(Stream stream, CancellationToken cancellationToken = default);
|
||||||
}
|
}
|
||||||
|
@@ -3,7 +3,7 @@
|
|||||||
namespace Phantom.Utils.Rpc.Frame;
|
namespace Phantom.Utils.Rpc.Frame;
|
||||||
|
|
||||||
interface IFrameReader {
|
interface IFrameReader {
|
||||||
Task OnMessageFrame(MessageFrame frame, Stream stream, CancellationToken cancellationToken);
|
Task OnMessageFrame(MessageFrame frame, CancellationToken cancellationToken);
|
||||||
void OnReplyFrame(ReplyFrame frame);
|
void OnReplyFrame(ReplyFrame frame);
|
||||||
void OnErrorFrame(ErrorFrame frame);
|
void OnErrorFrame(ErrorFrame frame);
|
||||||
void OnUnknownFrameStart(byte id);
|
void OnUnknownFrameStart(byte id);
|
||||||
|
@@ -3,7 +3,7 @@
|
|||||||
namespace Phantom.Utils.Rpc.Frame.Types;
|
namespace Phantom.Utils.Rpc.Frame.Types;
|
||||||
|
|
||||||
sealed record ErrorFrame(uint ReplyingToMessageId, RpcError Error) : IFrame {
|
sealed record ErrorFrame(uint ReplyingToMessageId, RpcError Error) : IFrame {
|
||||||
public ReadOnlyMemory<byte> Type => IFrame.TypeError;
|
public ReadOnlyMemory<byte> FrameType => IFrame.TypeError;
|
||||||
|
|
||||||
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
||||||
await Serialization.WriteUnsignedInt(ReplyingToMessageId, stream, cancellationToken);
|
await Serialization.WriteUnsignedInt(ReplyingToMessageId, stream, cancellationToken);
|
||||||
|
@@ -5,7 +5,7 @@ namespace Phantom.Utils.Rpc.Frame.Types;
|
|||||||
sealed record MessageFrame(uint MessageId, ushort RegistryCode, ReadOnlyMemory<byte> SerializedMessage) : IFrame {
|
sealed record MessageFrame(uint MessageId, ushort RegistryCode, ReadOnlyMemory<byte> SerializedMessage) : IFrame {
|
||||||
public const int MaxMessageBytes = 1024 * 1024 * 8;
|
public const int MaxMessageBytes = 1024 * 1024 * 8;
|
||||||
|
|
||||||
public ReadOnlyMemory<byte> Type => IFrame.TypeMessage;
|
public ReadOnlyMemory<byte> FrameType => IFrame.TypeMessage;
|
||||||
|
|
||||||
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
||||||
int messageLength = SerializedMessage.Length;
|
int messageLength = SerializedMessage.Length;
|
||||||
|
@@ -3,7 +3,7 @@
|
|||||||
sealed record PingFrame : IFrame {
|
sealed record PingFrame : IFrame {
|
||||||
public static PingFrame Instance { get; } = new PingFrame();
|
public static PingFrame Instance { get; } = new PingFrame();
|
||||||
|
|
||||||
public ReadOnlyMemory<byte> Type => IFrame.TypePing;
|
public ReadOnlyMemory<byte> FrameType => IFrame.TypePing;
|
||||||
|
|
||||||
public Task Write(Stream stream, CancellationToken cancellationToken) {
|
public Task Write(Stream stream, CancellationToken cancellationToken) {
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
|
@@ -5,7 +5,7 @@ namespace Phantom.Utils.Rpc.Frame.Types;
|
|||||||
sealed record ReplyFrame(uint ReplyingToMessageId, ReadOnlyMemory<byte> SerializedReply) : IFrame {
|
sealed record ReplyFrame(uint ReplyingToMessageId, ReadOnlyMemory<byte> SerializedReply) : IFrame {
|
||||||
public const int MaxReplyBytes = 1024 * 1024 * 32;
|
public const int MaxReplyBytes = 1024 * 1024 * 32;
|
||||||
|
|
||||||
public ReadOnlyMemory<byte> Type => IFrame.TypeReply;
|
public ReadOnlyMemory<byte> FrameType => IFrame.TypeReply;
|
||||||
|
|
||||||
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
||||||
int replyLength = SerializedReply.Length;
|
int replyLength = SerializedReply.Length;
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
using Phantom.Utils.Actor;
|
using System.Diagnostics.CodeAnalysis;
|
||||||
|
using Phantom.Utils.Actor;
|
||||||
using Phantom.Utils.Rpc.Frame.Types;
|
using Phantom.Utils.Rpc.Frame.Types;
|
||||||
using Phantom.Utils.Rpc.Runtime;
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
@@ -36,6 +37,10 @@ public sealed class MessageRegistry<TMessageBase>(ILogger logger) {
|
|||||||
return messageType.GetInterfaces().Any(type => type.FullName is {} name && name.StartsWith(replyInterfaceName, StringComparison.Ordinal));
|
return messageType.GetInterfaces().Any(type => type.FullName is {} name && name.StartsWith(replyInterfaceName, StringComparison.Ordinal));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
internal bool TryGetType(MessageFrame frame, [NotNullWhen(true)] out Type? type) {
|
||||||
|
return codeToTypeMapping.TryGetValue(frame.RegistryCode, out type);
|
||||||
|
}
|
||||||
|
|
||||||
internal MessageFrame CreateFrame<TMessage>(uint messageId, TMessage message) where TMessage : TMessageBase {
|
internal MessageFrame CreateFrame<TMessage>(uint messageId, TMessage message) where TMessage : TMessageBase {
|
||||||
if (typeToCodeMapping.TryGetValue(typeof(TMessage), out ushort code)) {
|
if (typeToCodeMapping.TryGetValue(typeof(TMessage), out ushort code)) {
|
||||||
return new MessageFrame(messageId, code, Serialization.Serialize(message));
|
return new MessageFrame(messageId, code, Serialization.Serialize(message));
|
||||||
|
@@ -1,7 +1,6 @@
|
|||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Rpc.Frame;
|
using Phantom.Utils.Rpc.Frame;
|
||||||
using Phantom.Utils.Rpc.Frame.Types;
|
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
@@ -9,53 +8,38 @@ namespace Phantom.Utils.Rpc.Runtime.Client;
|
|||||||
|
|
||||||
public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> : IDisposable {
|
public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> : IDisposable {
|
||||||
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, RpcClientConnectionParameters connectionParameters, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) {
|
public static async Task<RpcClient<TClientToServerMessage, TServerToClientMessage>?> Connect(string loggerName, RpcClientConnectionParameters connectionParameters, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, CancellationToken cancellationToken) {
|
||||||
RpcClientConnector connector = new RpcClientConnector(loggerName, connectionParameters);
|
RpcClientToServerConnector connector = new RpcClientToServerConnector(loggerName, connectionParameters);
|
||||||
RpcClientConnector.Connection? connection = await connector.EstablishNewConnection(cancellationToken);
|
RpcClientToServerConnector.Connection? connection = await connector.EstablishNewConnection(cancellationToken);
|
||||||
return connection == null ? null : new RpcClient<TClientToServerMessage, TServerToClientMessage>(loggerName, connectionParameters, messageDefinitions, connector, connection);
|
return connection == null ? null : new RpcClient<TClientToServerMessage, TServerToClientMessage>(loggerName, connectionParameters, messageDefinitions, connector, connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private readonly string loggerName;
|
||||||
private readonly ILogger logger;
|
private readonly ILogger logger;
|
||||||
private readonly MessageRegistry<TServerToClientMessage> serverToClientMessageRegistry;
|
private readonly MessageRegistry<TServerToClientMessage> serverToClientMessageRegistry;
|
||||||
private readonly RpcClientConnection connection;
|
private readonly RpcClientToServerConnection connection;
|
||||||
|
|
||||||
public RpcSendChannel<TClientToServerMessage> SendChannel { get; }
|
public RpcSendChannel<TClientToServerMessage> SendChannel { get; }
|
||||||
|
|
||||||
private RpcClient(string loggerName, RpcClientConnectionParameters connectionParameters, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, RpcClientConnector connector, RpcClientConnector.Connection connection) {
|
private RpcClient(string loggerName, RpcClientConnectionParameters connectionParameters, IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions, RpcClientToServerConnector connector, RpcClientToServerConnector.Connection connection) {
|
||||||
|
this.loggerName = loggerName;
|
||||||
this.logger = PhantomLogger.Create<RpcClient<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
this.logger = PhantomLogger.Create<RpcClient<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
||||||
this.serverToClientMessageRegistry = messageDefinitions.ToClient;
|
this.serverToClientMessageRegistry = messageDefinitions.ToClient;
|
||||||
|
|
||||||
this.connection = new RpcClientConnection(loggerName, connector, connection);
|
this.connection = new RpcClientToServerConnection(loggerName, connector, connection);
|
||||||
this.SendChannel = new RpcSendChannel<TClientToServerMessage>(loggerName, connectionParameters, this.connection, messageDefinitions.ToServer);
|
this.SendChannel = new RpcSendChannel<TClientToServerMessage>(loggerName, connectionParameters.Common, this.connection, messageDefinitions.ToServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Listen(ActorRef<TServerToClientMessage> actor) {
|
public async Task Listen(ActorRef<TServerToClientMessage> actor) {
|
||||||
|
var handler = new MessageHandlerImpl(SendChannel, actor);
|
||||||
try {
|
try {
|
||||||
await connection.ReadConnection(stream => Receive(stream, new MessageHandlerImpl(SendChannel, actor)));
|
await connection.ReadConnection(stream => Receive(stream, handler));
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
// Ignore.
|
// Ignore.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task Receive(Stream stream, MessageHandlerImpl handler) {
|
private async Task Receive(Stream stream, MessageHandlerImpl handler) {
|
||||||
await IFrame.ReadFrom(stream, new FrameReader(this, handler), CancellationToken.None);
|
await IFrame.ReadFrom(stream, new RpcFrameReader<TClientToServerMessage, TServerToClientMessage>(loggerName, serverToClientMessageRegistry, handler, SendChannel), CancellationToken.None);
|
||||||
}
|
|
||||||
|
|
||||||
private sealed class FrameReader(RpcClient<TClientToServerMessage, TServerToClientMessage> client, MessageHandlerImpl handler) : IFrameReader {
|
|
||||||
public Task OnMessageFrame(MessageFrame frame, Stream stream, CancellationToken cancellationToken) {
|
|
||||||
return client.serverToClientMessageRegistry.Handle(frame, handler, cancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void OnReplyFrame(ReplyFrame frame) {
|
|
||||||
client.SendChannel.ReceiveReply(frame);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void OnErrorFrame(ErrorFrame frame) {
|
|
||||||
client.SendChannel.ReceiveError(frame.ReplyingToMessageId, frame.Error);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void OnUnknownFrameStart(byte id) {
|
|
||||||
client.logger.Error("Received unknown frame ID: {Id}", id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private sealed class MessageHandlerImpl(RpcSendChannel<TClientToServerMessage> sendChannel, ActorRef<TServerToClientMessage> actor) : MessageHandler<TServerToClientMessage> {
|
private sealed class MessageHandlerImpl(RpcSendChannel<TClientToServerMessage> sendChannel, ActorRef<TServerToClientMessage> actor) : MessageHandler<TServerToClientMessage> {
|
||||||
|
@@ -10,4 +10,6 @@ public readonly record struct RpcClientConnectionParameters(
|
|||||||
AuthToken AuthToken,
|
AuthToken AuthToken,
|
||||||
ushort SendQueueCapacity,
|
ushort SendQueueCapacity,
|
||||||
TimeSpan PingInterval
|
TimeSpan PingInterval
|
||||||
);
|
) {
|
||||||
|
internal RpcCommonConnectionParameters Common => new (SendQueueCapacity, PingInterval);
|
||||||
|
}
|
||||||
|
@@ -3,11 +3,11 @@ using Serilog;
|
|||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Runtime.Client;
|
namespace Phantom.Utils.Rpc.Runtime.Client;
|
||||||
|
|
||||||
sealed class RpcClientConnection(string loggerName, RpcClientConnector connector, RpcClientConnector.Connection initialConnection) : IRpcConnectionProvider, IDisposable {
|
sealed class RpcClientToServerConnection(string loggerName, RpcClientToServerConnector connector, RpcClientToServerConnector.Connection initialConnection) : IRpcConnectionProvider, IDisposable {
|
||||||
private readonly ILogger logger = PhantomLogger.Create<RpcClientConnection>(loggerName);
|
private readonly ILogger logger = PhantomLogger.Create<RpcClientToServerConnection>(loggerName);
|
||||||
|
|
||||||
private readonly SemaphoreSlim semaphore = new (1);
|
private readonly SemaphoreSlim semaphore = new (1);
|
||||||
private RpcClientConnector.Connection currentConnection = initialConnection;
|
private RpcClientToServerConnector.Connection currentConnection = initialConnection;
|
||||||
|
|
||||||
private readonly CancellationTokenSource newConnectionCancellationTokenSource = new ();
|
private readonly CancellationTokenSource newConnectionCancellationTokenSource = new ();
|
||||||
|
|
||||||
@@ -15,7 +15,7 @@ sealed class RpcClientConnection(string loggerName, RpcClientConnector connector
|
|||||||
return (await GetConnection()).Stream;
|
return (await GetConnection()).Stream;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<RpcClientConnector.Connection> GetConnection() {
|
private async Task<RpcClientToServerConnector.Connection> GetConnection() {
|
||||||
CancellationToken cancellationToken = newConnectionCancellationTokenSource.Token;
|
CancellationToken cancellationToken = newConnectionCancellationTokenSource.Token;
|
||||||
|
|
||||||
await semaphore.WaitAsync(cancellationToken);
|
await semaphore.WaitAsync(cancellationToken);
|
||||||
@@ -31,7 +31,7 @@ sealed class RpcClientConnection(string loggerName, RpcClientConnector connector
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async Task ReadConnection(Func<Stream, Task> reader) {
|
public async Task ReadConnection(Func<Stream, Task> reader) {
|
||||||
RpcClientConnector.Connection? connection = null;
|
RpcClientToServerConnector.Connection? connection = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
@@ -9,7 +9,7 @@ using Serilog;
|
|||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Runtime.Client;
|
namespace Phantom.Utils.Rpc.Runtime.Client;
|
||||||
|
|
||||||
internal sealed class RpcClientConnector {
|
internal sealed class RpcClientToServerConnector {
|
||||||
private static readonly TimeSpan InitialRetryDelay = TimeSpan.FromMilliseconds(100);
|
private static readonly TimeSpan InitialRetryDelay = TimeSpan.FromMilliseconds(100);
|
||||||
private static readonly TimeSpan MaximumRetryDelay = TimeSpan.FromSeconds(30);
|
private static readonly TimeSpan MaximumRetryDelay = TimeSpan.FromSeconds(30);
|
||||||
private static readonly TimeSpan DisconnectTimeout = TimeSpan.FromSeconds(10);
|
private static readonly TimeSpan DisconnectTimeout = TimeSpan.FromSeconds(10);
|
||||||
@@ -21,8 +21,8 @@ internal sealed class RpcClientConnector {
|
|||||||
|
|
||||||
private bool loggedCertificateValidationError = false;
|
private bool loggedCertificateValidationError = false;
|
||||||
|
|
||||||
public RpcClientConnector(string loggerName, RpcClientConnectionParameters parameters) {
|
public RpcClientToServerConnector(string loggerName, RpcClientConnectionParameters parameters) {
|
||||||
this.logger = PhantomLogger.Create<RpcClientConnector>(loggerName);
|
this.logger = PhantomLogger.Create<RpcClientToServerConnector>(loggerName);
|
||||||
this.sessionId = Guid.NewGuid();
|
this.sessionId = Guid.NewGuid();
|
||||||
this.parameters = parameters;
|
this.parameters = parameters;
|
||||||
|
|
@@ -0,0 +1,6 @@
|
|||||||
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
readonly record struct RpcCommonConnectionParameters(
|
||||||
|
ushort SendQueueCapacity,
|
||||||
|
TimeSpan PingInterval
|
||||||
|
);
|
33
Utils/Phantom.Utils.Rpc/Runtime/RpcFrameReader.cs
Normal file
33
Utils/Phantom.Utils.Rpc/Runtime/RpcFrameReader.cs
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Frame;
|
||||||
|
using Phantom.Utils.Rpc.Frame.Types;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Serilog;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
sealed class RpcFrameReader<TSentMessage, TReceivedMessage>(string loggerName, MessageRegistry<TReceivedMessage> messageRegistry, MessageHandler<TReceivedMessage> messageHandler, RpcSendChannel<TSentMessage> sendChannel) : IFrameReader {
|
||||||
|
private readonly ILogger logger = PhantomLogger.Create<RpcFrameReader<TSentMessage, TReceivedMessage>>(loggerName);
|
||||||
|
|
||||||
|
public Task OnMessageFrame(MessageFrame frame, CancellationToken cancellationToken) {
|
||||||
|
if (messageRegistry.TryGetType(frame, out var messageType)) {
|
||||||
|
logger.Verbose("Received message {MesageId} of type {MessageType} ({Bytes} B).", frame.MessageId, messageType.Name, frame.SerializedMessage.Length);
|
||||||
|
}
|
||||||
|
|
||||||
|
return messageRegistry.Handle(frame, messageHandler, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnReplyFrame(ReplyFrame frame) {
|
||||||
|
logger.Verbose("Received reply to message {MesageId} ({Bytes} B).", frame.ReplyingToMessageId, frame.SerializedReply.Length);
|
||||||
|
sendChannel.ReceiveReply(frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnErrorFrame(ErrorFrame frame) {
|
||||||
|
logger.Warning("Received error response to message {MesageId}: {Error}", frame.ReplyingToMessageId, frame.Error);
|
||||||
|
sendChannel.ReceiveError(frame.ReplyingToMessageId, frame.Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnUnknownFrameStart(byte id) {
|
||||||
|
logger.Error("Received unknown frame ID: {FrameId}", id);
|
||||||
|
}
|
||||||
|
}
|
@@ -5,7 +5,6 @@ using Phantom.Utils.Logging;
|
|||||||
using Phantom.Utils.Rpc.Frame;
|
using Phantom.Utils.Rpc.Frame;
|
||||||
using Phantom.Utils.Rpc.Frame.Types;
|
using Phantom.Utils.Rpc.Frame.Types;
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
using Phantom.Utils.Rpc.Runtime.Client;
|
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Runtime;
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
@@ -24,7 +23,7 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
|||||||
|
|
||||||
private uint nextMessageId;
|
private uint nextMessageId;
|
||||||
|
|
||||||
internal RpcSendChannel(string loggerName, RpcClientConnectionParameters connectionParameters, IRpcConnectionProvider connectionProvider, MessageRegistry<TMessageBase> messageRegistry) {
|
internal RpcSendChannel(string loggerName, RpcCommonConnectionParameters connectionParameters, IRpcConnectionProvider connectionProvider, MessageRegistry<TMessageBase> messageRegistry) {
|
||||||
this.logger = PhantomLogger.Create<RpcSendChannel<TMessageBase>>(loggerName);
|
this.logger = PhantomLogger.Create<RpcSendChannel<TMessageBase>>(loggerName);
|
||||||
this.connectionProvider = connectionProvider;
|
this.connectionProvider = connectionProvider;
|
||||||
this.messageRegistry = messageRegistry;
|
this.messageRegistry = messageRegistry;
|
||||||
@@ -45,7 +44,7 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
|||||||
return sendQueue.Writer.TryWrite(NextMessageFrame(message));
|
return sendQueue.Writer.TryWrite(NextMessageFrame(message));
|
||||||
}
|
}
|
||||||
|
|
||||||
public async ValueTask SendMessage<TMessage>(TMessage message, CancellationToken cancellationToken) where TMessage : TMessageBase {
|
public async ValueTask SendMessage<TMessage>(TMessage message, CancellationToken cancellationToken = default) where TMessage : TMessageBase {
|
||||||
await SendFrame(NextMessageFrame(message), cancellationToken);
|
await SendFrame(NextMessageFrame(message), cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -95,7 +94,7 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
await stream.WriteAsync(frame.Type);
|
await stream.WriteAsync(frame.FrameType);
|
||||||
await frame.Write(stream);
|
await frame.Write(stream);
|
||||||
await stream.FlushAsync();
|
await stream.FlushAsync();
|
||||||
}
|
}
|
||||||
@@ -108,6 +107,7 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
|||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
await Task.Delay(interval, cancellationToken);
|
await Task.Delay(interval, cancellationToken);
|
||||||
|
// TODO wait for pong?
|
||||||
|
|
||||||
if (!sendQueue.Writer.TryWrite(PingFrame.Instance)) {
|
if (!sendQueue.Writer.TryWrite(PingFrame.Instance)) {
|
||||||
logger.Warning("Skipped a ping due to a full queue.");
|
logger.Warning("Skipped a ping due to a full queue.");
|
||||||
|
@@ -0,0 +1,7 @@
|
|||||||
|
using Phantom.Utils.Actor;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
|
public interface IRpcServerClientRegistrar<TClientToServerMessage, TServerToClientMessage> {
|
||||||
|
ActorRef<TClientToServerMessage> Register(Guid sessionId, RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage> connection);
|
||||||
|
}
|
@@ -4,17 +4,26 @@ using System.Net.Sockets;
|
|||||||
using System.Security.Cryptography.X509Certificates;
|
using System.Security.Cryptography.X509Certificates;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Monads;
|
using Phantom.Utils.Monads;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
using Phantom.Utils.Rpc.Runtime.Tls;
|
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Runtime.Server;
|
namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
public sealed class RpcServer(string loggerName, EndPoint endPoint, AuthToken authToken, RpcServerCertificate certificate) {
|
public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
||||||
private readonly ILogger logger = PhantomLogger.Create<RpcServer>(loggerName);
|
string loggerName,
|
||||||
private readonly RpcServerClientManager clientManager = new ();
|
RpcServerConnectionParameters connectionParameters,
|
||||||
|
IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions,
|
||||||
|
IRpcServerClientRegistrar<TClientToServerMessage, TServerToClientMessage> clientRegistrar
|
||||||
|
) {
|
||||||
|
private readonly ILogger logger = PhantomLogger.Create<RpcServer<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
||||||
|
private readonly RpcServerClientSessions<TServerToClientMessage> clientSessions = new (loggerName, connectionParameters.Common, messageDefinitions.ToClient);
|
||||||
|
|
||||||
private readonly List<Client> clients = [];
|
private readonly List<Client> clients = [];
|
||||||
|
|
||||||
public async Task<bool> Run(CancellationToken shutdownToken) {
|
public async Task<bool> Run(CancellationToken shutdownToken) {
|
||||||
|
EndPoint endPoint = connectionParameters.EndPoint;
|
||||||
|
|
||||||
SslServerAuthenticationOptions sslOptions = new () {
|
SslServerAuthenticationOptions sslOptions = new () {
|
||||||
AllowRenegotiation = false,
|
AllowRenegotiation = false,
|
||||||
AllowTlsResume = true,
|
AllowTlsResume = true,
|
||||||
@@ -22,7 +31,7 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, AuthToken au
|
|||||||
ClientCertificateRequired = false,
|
ClientCertificateRequired = false,
|
||||||
EnabledSslProtocols = TlsSupport.SupportedProtocols,
|
EnabledSslProtocols = TlsSupport.SupportedProtocols,
|
||||||
EncryptionPolicy = EncryptionPolicy.RequireEncryption,
|
EncryptionPolicy = EncryptionPolicy.RequireEncryption,
|
||||||
ServerCertificate = certificate.Certificate,
|
ServerCertificate = connectionParameters.Certificate.Certificate,
|
||||||
};
|
};
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -39,11 +48,13 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, AuthToken au
|
|||||||
try {
|
try {
|
||||||
logger.Information("Server listening on {EndPoint}.", endPoint);
|
logger.Information("Server listening on {EndPoint}.", endPoint);
|
||||||
|
|
||||||
while (!shutdownToken.IsCancellationRequested) {
|
while (true) {
|
||||||
Socket clientSocket = await serverSocket.AcceptAsync(shutdownToken);
|
Socket clientSocket = await serverSocket.AcceptAsync(shutdownToken);
|
||||||
clients.Add(new Client(clientManager, clientSocket, sslOptions, authToken, shutdownToken));
|
clients.Add(new Client(loggerName, messageDefinitions, clientRegistrar, clientSessions, clientSocket, sslOptions, connectionParameters.AuthToken, shutdownToken));
|
||||||
clients.RemoveAll(static client => client.Task.IsCompleted);
|
clients.RemoveAll(static client => client.Task.IsCompleted);
|
||||||
}
|
}
|
||||||
|
} catch (OperationCanceledException) {
|
||||||
|
// Ignore.
|
||||||
} finally {
|
} finally {
|
||||||
await Stop(serverSocket);
|
await Stop(serverSocket);
|
||||||
}
|
}
|
||||||
@@ -67,6 +78,7 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, AuthToken au
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
await Task.WhenAll(clients.Select(static client => client.Task));
|
await Task.WhenAll(clients.Select(static client => client.Task));
|
||||||
|
await clientSessions.Shutdown();
|
||||||
} catch (Exception) {
|
} catch (Exception) {
|
||||||
// Ignore exceptions when shutting down.
|
// Ignore exceptions when shutting down.
|
||||||
}
|
}
|
||||||
@@ -81,16 +93,31 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, AuthToken au
|
|||||||
|
|
||||||
private string Address => socket.RemoteEndPoint?.ToString() ?? "<unknown address>";
|
private string Address => socket.RemoteEndPoint?.ToString() ?? "<unknown address>";
|
||||||
|
|
||||||
|
private readonly string loggerName;
|
||||||
private readonly ILogger logger;
|
private readonly ILogger logger;
|
||||||
private readonly RpcServerClientManager clientManager;
|
private readonly IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions;
|
||||||
|
private readonly IRpcServerClientRegistrar<TClientToServerMessage, TServerToClientMessage> clientRegistrar;
|
||||||
|
private readonly RpcServerClientSessions<TServerToClientMessage> clientSessions;
|
||||||
private readonly Socket socket;
|
private readonly Socket socket;
|
||||||
private readonly SslServerAuthenticationOptions sslOptions;
|
private readonly SslServerAuthenticationOptions sslOptions;
|
||||||
private readonly AuthToken authToken;
|
private readonly AuthToken authToken;
|
||||||
private readonly CancellationToken shutdownToken;
|
private readonly CancellationToken shutdownToken;
|
||||||
|
|
||||||
public Client(RpcServerClientManager clientManager, Socket socket, SslServerAuthenticationOptions sslOptions, AuthToken authToken, CancellationToken shutdownToken) {
|
public Client(
|
||||||
this.logger = PhantomLogger.Create<RpcServer, Client>(Address);
|
string serverLoggerName,
|
||||||
this.clientManager = clientManager;
|
IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions,
|
||||||
|
IRpcServerClientRegistrar<TClientToServerMessage, TServerToClientMessage> clientRegistrar,
|
||||||
|
RpcServerClientSessions<TServerToClientMessage> clientSessions,
|
||||||
|
Socket socket,
|
||||||
|
SslServerAuthenticationOptions sslOptions,
|
||||||
|
AuthToken authToken,
|
||||||
|
CancellationToken shutdownToken
|
||||||
|
) {
|
||||||
|
this.loggerName = PhantomLogger.ConcatNames(serverLoggerName, Address);
|
||||||
|
this.logger = PhantomLogger.Create<RpcServer<TClientToServerMessage, TServerToClientMessage>, Client>(loggerName);
|
||||||
|
this.messageDefinitions = messageDefinitions;
|
||||||
|
this.clientRegistrar = clientRegistrar;
|
||||||
|
this.clientSessions = clientSessions;
|
||||||
this.socket = socket;
|
this.socket = socket;
|
||||||
this.sslOptions = sslOptions;
|
this.sslOptions = sslOptions;
|
||||||
this.authToken = authToken;
|
this.authToken = authToken;
|
||||||
@@ -103,21 +130,28 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, AuthToken au
|
|||||||
try {
|
try {
|
||||||
await using var stream = new SslStream(new NetworkStream(socket, ownsSocket: false), leaveInnerStreamOpen: false);
|
await using var stream = new SslStream(new NetworkStream(socket, ownsSocket: false), leaveInnerStreamOpen: false);
|
||||||
|
|
||||||
Guid? sessionId;
|
Guid? sessionIdResult;
|
||||||
try {
|
try {
|
||||||
sessionId = await InitializeConnection(stream);
|
sessionIdResult = await InitializeConnection(stream);
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
logger.Warning("Cancelling incoming client due to shutdown.");
|
logger.Warning("Cancelling incoming client due to shutdown.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!sessionId.HasValue) {
|
if (!sessionIdResult.HasValue) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Guid sessionId = sessionIdResult.Value;
|
||||||
logger.Information("Client connected.");
|
logger.Information("Client connected.");
|
||||||
|
|
||||||
clientManager.SetConnection(sessionId.Value, stream);
|
var session = clientSessions.OnConnected(sessionId, stream);
|
||||||
|
try {
|
||||||
|
var connection = new RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage>(loggerName, clientSessions, sessionId, stream, session, messageDefinitions.ToServer);
|
||||||
|
await connection.Listen(clientRegistrar.Register(sessionId, connection));
|
||||||
|
} finally {
|
||||||
|
clientSessions.OnDisconnected(sessionId);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
logger.Information("Disconnecting client...");
|
logger.Information("Disconnecting client...");
|
||||||
try {
|
try {
|
||||||
@@ -129,6 +163,7 @@ public sealed class RpcServer(string loggerName, EndPoint endPoint, AuthToken au
|
|||||||
logger.Error(e, "Could not disconnect client socket.");
|
logger.Error(e, "Could not disconnect client socket.");
|
||||||
} finally {
|
} finally {
|
||||||
socket.Close();
|
socket.Close();
|
||||||
|
logger.Information("Client socket closed.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,4 +0,0 @@
|
|||||||
namespace Phantom.Utils.Rpc.Runtime.Server;
|
|
||||||
|
|
||||||
public sealed class RpcServerClientConnection {
|
|
||||||
}
|
|
@@ -1,13 +0,0 @@
|
|||||||
namespace Phantom.Utils.Rpc.Runtime.Server;
|
|
||||||
|
|
||||||
sealed class RpcServerClientManager {
|
|
||||||
private readonly Dictionary<Guid, RpcServerClientConnection> connectionsBySessionId = new ();
|
|
||||||
|
|
||||||
internal void SetConnection(Guid sessionId, Stream stream) {
|
|
||||||
connectionsBySessionId.AddOrUpdate(sessionId, id => {
|
|
||||||
return new RpcServerClientConnection(id, stream);
|
|
||||||
}, connection => {
|
|
||||||
return connection;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
@@ -0,0 +1,48 @@
|
|||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
|
sealed class RpcServerClientSession<TServerToClientMessage> : IRpcConnectionProvider {
|
||||||
|
public RpcSendChannel<TServerToClientMessage> SendChannel { get; }
|
||||||
|
|
||||||
|
private TaskCompletionSource<Stream> nextStream = new ();
|
||||||
|
|
||||||
|
public RpcServerClientSession(string loggerName, RpcCommonConnectionParameters connectionParameters, MessageRegistry<TServerToClientMessage> messageRegistry) {
|
||||||
|
this.SendChannel = new RpcSendChannel<TServerToClientMessage>(loggerName, connectionParameters, this, messageRegistry);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnConnected(Stream stream) {
|
||||||
|
lock (this) {
|
||||||
|
if (!nextStream.Task.IsCanceled && !nextStream.TrySetResult(stream)) {
|
||||||
|
nextStream = new TaskCompletionSource<Stream>();
|
||||||
|
nextStream.SetResult(stream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnDisconnected() {
|
||||||
|
lock (this) {
|
||||||
|
var task = nextStream.Task;
|
||||||
|
if (task is { IsCompleted: true, IsCanceled: false }) {
|
||||||
|
nextStream = new TaskCompletionSource<Stream>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Task<Stream> IRpcConnectionProvider.GetStream() {
|
||||||
|
lock (this) {
|
||||||
|
return nextStream.Task;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Close() {
|
||||||
|
lock (this) {
|
||||||
|
if (!nextStream.TrySetCanceled()) {
|
||||||
|
nextStream = new TaskCompletionSource<Stream>();
|
||||||
|
nextStream.SetCanceled();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return SendChannel.Close();
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,44 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
|
sealed class RpcServerClientSessions<TServerToClientMessage>(string loggerName, RpcCommonConnectionParameters connectionParameters, MessageRegistry<TServerToClientMessage> messageRegistry) {
|
||||||
|
private readonly ConcurrentDictionary<Guid, RpcServerClientSession<TServerToClientMessage>> sessionsById = new ();
|
||||||
|
|
||||||
|
internal RpcSendChannel<TServerToClientMessage> OnConnected(Guid sessionId, Stream stream) {
|
||||||
|
var session = sessionsById.GetOrAdd(sessionId, CreateSession);
|
||||||
|
session.OnConnected(stream);
|
||||||
|
return session.SendChannel;
|
||||||
|
}
|
||||||
|
|
||||||
|
private RpcServerClientSession<TServerToClientMessage> CreateSession(Guid sessionId) {
|
||||||
|
return new RpcServerClientSession<TServerToClientMessage>(PhantomLogger.ConcatNames(loggerName, sessionId.ToString()), connectionParameters, messageRegistry);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnDisconnected(Guid sessionId) {
|
||||||
|
if (sessionsById.TryGetValue(sessionId, out var session)) {
|
||||||
|
session.OnDisconnected();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task CloseSession(Guid sessionId) {
|
||||||
|
if (sessionsById.Remove(sessionId, out var session)) {
|
||||||
|
return session.Close();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Shutdown() {
|
||||||
|
List<Task> tasks = [];
|
||||||
|
|
||||||
|
foreach (Guid guid in sessionsById.Keys) {
|
||||||
|
tasks.Add(CloseSession(guid));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.WhenAll(tasks);
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,14 @@
|
|||||||
|
using System.Net;
|
||||||
|
using Phantom.Utils.Rpc.Runtime.Tls;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
|
public readonly record struct RpcServerConnectionParameters(
|
||||||
|
EndPoint EndPoint,
|
||||||
|
RpcServerCertificate Certificate,
|
||||||
|
AuthToken AuthToken,
|
||||||
|
ushort SendQueueCapacity,
|
||||||
|
TimeSpan PingInterval
|
||||||
|
) {
|
||||||
|
internal RpcCommonConnectionParameters Common => new (SendQueueCapacity, PingInterval);
|
||||||
|
}
|
@@ -0,0 +1,54 @@
|
|||||||
|
using Phantom.Utils.Actor;
|
||||||
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Frame;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
|
public sealed class RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage> {
|
||||||
|
private readonly string loggerName;
|
||||||
|
private readonly RpcServerClientSessions<TServerToClientMessage> sessions;
|
||||||
|
private readonly Guid sessionId;
|
||||||
|
private readonly Stream stream;
|
||||||
|
private readonly MessageRegistry<TClientToServerMessage> messageRegistry;
|
||||||
|
|
||||||
|
public RpcSendChannel<TServerToClientMessage> SendChannel { get; }
|
||||||
|
|
||||||
|
internal RpcServerToClientConnection(string parentLoggerName, RpcServerClientSessions<TServerToClientMessage> sessions, Guid sessionId, Stream stream, RpcSendChannel<TServerToClientMessage> sendChannel, MessageRegistry<TClientToServerMessage> messageRegistry) {
|
||||||
|
this.loggerName = PhantomLogger.ConcatNames(parentLoggerName, sessionId.ToString());
|
||||||
|
this.sessions = sessions;
|
||||||
|
this.sessionId = sessionId;
|
||||||
|
this.stream = stream;
|
||||||
|
this.messageRegistry = messageRegistry;
|
||||||
|
this.SendChannel = sendChannel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task CloseSession() {
|
||||||
|
return sessions.CloseSession(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
internal async Task Listen(ActorRef<TClientToServerMessage> actor) {
|
||||||
|
var handler = new MessageHandlerImpl(SendChannel, actor);
|
||||||
|
try {
|
||||||
|
await Receive(stream, handler);
|
||||||
|
} catch (OperationCanceledException) {
|
||||||
|
// Ignore.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task Receive(Stream stream, MessageHandlerImpl handler) {
|
||||||
|
await IFrame.ReadFrom(stream, new RpcFrameReader<TServerToClientMessage, TClientToServerMessage>(loggerName, messageRegistry, handler, SendChannel), CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class MessageHandlerImpl(RpcSendChannel<TServerToClientMessage> sendChannel, ActorRef<TClientToServerMessage> actor) : MessageHandler<TClientToServerMessage> {
|
||||||
|
public ActorRef<TClientToServerMessage> Actor => actor;
|
||||||
|
|
||||||
|
public ValueTask OnReply<TMessage, TReply>(uint messageId, TReply reply, CancellationToken cancellationToken) where TMessage : TClientToServerMessage, ICanReply<TReply> {
|
||||||
|
return sendChannel.SendReply(messageId, reply, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask OnError(uint messageId, RpcError error, CancellationToken cancellationToken) {
|
||||||
|
return sendChannel.SendError(messageId, error, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -6,7 +6,7 @@ namespace Phantom.Web.Services.Rpc;
|
|||||||
|
|
||||||
public sealed class ControllerConnection(RpcSendChannel<IMessageToController> connection) {
|
public sealed class ControllerConnection(RpcSendChannel<IMessageToController> connection) {
|
||||||
public ValueTask Send<TMessage>(TMessage message) where TMessage : IMessageToController {
|
public ValueTask Send<TMessage>(TMessage message) where TMessage : IMessageToController {
|
||||||
return connection.SendMessage(message, CancellationToken.None);
|
return connection.SendMessage(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken = default) where TMessage : IMessageToController, ICanReply<TReply> {
|
public Task<TReply> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken waitForReplyCancellationToken = default) where TMessage : IMessageToController, ICanReply<TReply> {
|
||||||
|
@@ -90,7 +90,7 @@ try {
|
|||||||
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
|
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
await rpcClient.SendChannel.SendMessage(new UnregisterWebMessage(), CancellationToken.None);
|
await rpcClient.SendChannel.SendMessage(new UnregisterWebMessage());
|
||||||
// TODO wait for acknowledgment
|
// TODO wait for acknowledgment
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
PhantomLogger.Root.Warning(e, "Could not unregister agent after shutdown.");
|
PhantomLogger.Root.Warning(e, "Could not unregister agent after shutdown.");
|
||||||
|
Reference in New Issue
Block a user