mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-09-18 06:24:48 +02:00
Compare commits
5 Commits
cf609a02f8
...
61e6a3b54c
Author | SHA1 | Date | |
---|---|---|---|
61e6a3b54c
|
|||
f4972a16cd
|
|||
7ec9f3db91
|
|||
61e847c366
|
|||
1f6cc0532a
|
@@ -56,11 +56,6 @@ sealed class InstanceManagerActor : ReceiveActor<InstanceManagerActor.ICommand>
|
|||||||
ReceiveAsync<ShutdownCommand>(Shutdown);
|
ReceiveAsync<ShutdownCommand>(Shutdown);
|
||||||
}
|
}
|
||||||
|
|
||||||
private string GetInstanceLoggerName(Guid guid) {
|
|
||||||
var prefix = guid.ToString();
|
|
||||||
return prefix[..prefix.IndexOf('-')] + "/" + Interlocked.Increment(ref instanceLoggerSequenceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
private sealed record InstanceInfo(ActorRef<InstanceActor.ICommand> Actor, InstanceConfiguration Configuration, IServerLauncher Launcher);
|
private sealed record InstanceInfo(ActorRef<InstanceActor.ICommand> Actor, InstanceConfiguration Configuration, IServerLauncher Launcher);
|
||||||
|
|
||||||
public interface ICommand {}
|
public interface ICommand {}
|
||||||
@@ -118,7 +113,8 @@ sealed class InstanceManagerActor : ReceiveActor<InstanceManagerActor.ICommand>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
var instanceInit = new InstanceActor.Init(agentState, instanceGuid, GetInstanceLoggerName(instanceGuid), instanceServices, instanceTicketManager, shutdownCancellationToken);
|
var instanceLoggerName = PhantomLogger.ShortenGuid(instanceGuid) + "/" + Interlocked.Increment(ref instanceLoggerSequenceId);;
|
||||||
|
var instanceInit = new InstanceActor.Init(agentState, instanceGuid, instanceLoggerName, instanceServices, instanceTicketManager, shutdownCancellationToken);
|
||||||
instances[instanceGuid] = instance = new InstanceInfo(Context.ActorOf(InstanceActor.Factory(instanceInit), "Instance-" + instanceGuid), configuration, launcher);
|
instances[instanceGuid] = instance = new InstanceInfo(Context.ActorOf(InstanceActor.Factory(instanceInit), "Instance-" + instanceGuid), configuration, launcher);
|
||||||
|
|
||||||
Logger.Information("Created instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, instanceGuid);
|
Logger.Information("Created instance \"{Name}\" (GUID {Guid}).", configuration.InstanceName, instanceGuid);
|
||||||
|
@@ -7,6 +7,7 @@ using Phantom.Common.Messages.Agent;
|
|||||||
using Phantom.Common.Messages.Agent.ToController;
|
using Phantom.Common.Messages.Agent.ToController;
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
using Phantom.Utils.Rpc.Runtime.Client;
|
using Phantom.Utils.Rpc.Runtime.Client;
|
||||||
using Phantom.Utils.Runtime;
|
using Phantom.Utils.Runtime;
|
||||||
using Phantom.Utils.Threading;
|
using Phantom.Utils.Threading;
|
||||||
@@ -74,7 +75,7 @@ try {
|
|||||||
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(agentServices);
|
var rpcMessageHandlerInit = new ControllerMessageHandlerActor.Init(agentServices);
|
||||||
var rpcMessageHandlerActor = agentServices.ActorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
|
var rpcMessageHandlerActor = agentServices.ActorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
|
||||||
|
|
||||||
rpcClientListener = rpcClient.Listen(rpcMessageHandlerActor);
|
rpcClientListener = rpcClient.Listen(new IMessageReceiver<IMessageToAgent>.Actor(rpcMessageHandlerActor));
|
||||||
|
|
||||||
if (await agentServices.Register(controllerConnection, shutdownCancellationToken)) {
|
if (await agentServices.Register(controllerConnection, shutdownCancellationToken)) {
|
||||||
PhantomLogger.Root.Information("Phantom Panel agent is ready.");
|
PhantomLogger.Root.Information("Phantom Panel agent is ready.");
|
||||||
|
@@ -22,7 +22,6 @@ public static class AgentMessageRegistries {
|
|||||||
|
|
||||||
ToController.Add<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(0);
|
ToController.Add<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(0);
|
||||||
ToController.Add<UnregisterAgentMessage>(1);
|
ToController.Add<UnregisterAgentMessage>(1);
|
||||||
ToController.Add<AgentIsAliveMessage>(2);
|
|
||||||
ToController.Add<AdvertiseJavaRuntimesMessage>(3);
|
ToController.Add<AdvertiseJavaRuntimesMessage>(3);
|
||||||
ToController.Add<ReportInstanceStatusMessage>(4);
|
ToController.Add<ReportInstanceStatusMessage>(4);
|
||||||
ToController.Add<InstanceOutputMessage>(5);
|
ToController.Add<InstanceOutputMessage>(5);
|
||||||
|
@@ -1,6 +0,0 @@
|
|||||||
using MemoryPack;
|
|
||||||
|
|
||||||
namespace Phantom.Common.Messages.Agent.ToController;
|
|
||||||
|
|
||||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
|
||||||
public sealed partial record AgentIsAliveMessage : IMessageToController;
|
|
@@ -1,8 +0,0 @@
|
|||||||
using MemoryPack;
|
|
||||||
|
|
||||||
namespace Phantom.Common.Messages.Web.ToWeb;
|
|
||||||
|
|
||||||
[MemoryPackable(GenerateType.VersionTolerant)]
|
|
||||||
public sealed partial record RegisterWebResultMessage(
|
|
||||||
[property: MemoryPackOrder(0)] bool Success
|
|
||||||
) : IMessageToWeb;
|
|
@@ -22,8 +22,9 @@ sealed class AgentConnection(Guid agentGuid, string agentName) {
|
|||||||
|
|
||||||
public bool CloseIfSame(RpcServerToClientConnection<IMessageToController, IMessageToAgent> expected) {
|
public bool CloseIfSame(RpcServerToClientConnection<IMessageToController, IMessageToAgent> expected) {
|
||||||
lock (this) {
|
lock (this) {
|
||||||
if (connection != null && connection.IsSame(expected)) {
|
if (connection != null && ReferenceEquals(connection, expected)) {
|
||||||
connection.CloseSession();
|
connection.CloseSession();
|
||||||
|
connection = null;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -28,7 +28,8 @@ sealed class AgentManager {
|
|||||||
private readonly IDbContextProvider dbProvider;
|
private readonly IDbContextProvider dbProvider;
|
||||||
private readonly CancellationToken cancellationToken;
|
private readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
private readonly ConcurrentDictionary<Guid, ActorRef<AgentActor.ICommand>> agentsByGuid = new ();
|
private readonly ConcurrentDictionary<Guid, ActorRef<AgentActor.ICommand>> agentsByAgentGuid = new ();
|
||||||
|
private readonly ConcurrentDictionary<Guid, Guid> agentGuidsBySessionGuid = new ();
|
||||||
private readonly Func<Guid, AgentConfiguration, ActorRef<AgentActor.ICommand>> addAgentActorFactory;
|
private readonly Func<Guid, AgentConfiguration, ActorRef<AgentActor.ICommand>> addAgentActorFactory;
|
||||||
|
|
||||||
public AgentManager(IActorRefFactory actorSystem, ControllerState controllerState, MinecraftVersions minecraftVersions, UserLoginManager userLoginManager, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
|
public AgentManager(IActorRefFactory actorSystem, ControllerState controllerState, MinecraftVersions minecraftVersions, UserLoginManager userLoginManager, IDbContextProvider dbProvider, CancellationToken cancellationToken) {
|
||||||
@@ -55,7 +56,7 @@ sealed class AgentManager {
|
|||||||
var agentGuid = entity.AgentGuid;
|
var agentGuid = entity.AgentGuid;
|
||||||
var agentConfiguration = new AgentConfiguration(entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory);
|
var agentConfiguration = new AgentConfiguration(entity.Name, entity.ProtocolVersion, entity.BuildVersion, entity.MaxInstances, entity.MaxMemory);
|
||||||
|
|
||||||
if (agentsByGuid.TryAdd(agentGuid, CreateAgentActor(agentGuid, agentConfiguration))) {
|
if (agentsByAgentGuid.TryAdd(agentGuid, CreateAgentActor(agentGuid, agentConfiguration))) {
|
||||||
Logger.Information("Loaded agent \"{AgentName}\" (GUID {AgentGuid}) from database.", agentConfiguration.AgentName, agentGuid);
|
Logger.Information("Loaded agent \"{AgentName}\" (GUID {AgentGuid}) from database.", agentConfiguration.AgentName, agentGuid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -63,12 +64,13 @@ sealed class AgentManager {
|
|||||||
|
|
||||||
public async Task<ImmutableArray<ConfigureInstanceMessage>> RegisterAgent(AgentInfo agentInfo, RpcServerToClientConnection<IMessageToController, IMessageToAgent> connection) {
|
public async Task<ImmutableArray<ConfigureInstanceMessage>> RegisterAgent(AgentInfo agentInfo, RpcServerToClientConnection<IMessageToController, IMessageToAgent> connection) {
|
||||||
var agentProperties = AgentConfiguration.From(agentInfo);
|
var agentProperties = AgentConfiguration.From(agentInfo);
|
||||||
var agentActor = agentsByGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
|
var agentActor = agentsByAgentGuid.GetOrAdd(agentInfo.AgentGuid, addAgentActorFactory, agentProperties);
|
||||||
|
agentGuidsBySessionGuid[connection.SessionId] = agentInfo.AgentGuid;
|
||||||
return await agentActor.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
|
return await agentActor.Request(new AgentActor.RegisterCommand(agentProperties, connection), cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) {
|
public bool TellAgent(Guid agentGuid, AgentActor.ICommand command) {
|
||||||
if (agentsByGuid.TryGetValue(agentGuid, out var agent)) {
|
if (agentsByAgentGuid.TryGetValue(agentGuid, out var agent)) {
|
||||||
agent.Tell(command);
|
agent.Tell(command);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -84,7 +86,7 @@ sealed class AgentManager {
|
|||||||
return (UserInstanceActionFailure) UserActionFailure.NotAuthorized;
|
return (UserInstanceActionFailure) UserActionFailure.NotAuthorized;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!agentsByGuid.TryGetValue(agentGuid, out var agent)) {
|
if (!agentsByAgentGuid.TryGetValue(agentGuid, out var agent)) {
|
||||||
return (UserInstanceActionFailure) InstanceActionFailure.AgentDoesNotExist;
|
return (UserInstanceActionFailure) InstanceActionFailure.AgentDoesNotExist;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -92,4 +94,12 @@ sealed class AgentManager {
|
|||||||
var result = await agent.Request(command, cancellationToken);
|
var result = await agent.Request(command, cancellationToken);
|
||||||
return result.MapError(static error => (UserInstanceActionFailure) error);
|
return result.MapError(static error => (UserInstanceActionFailure) error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public bool TryGetAgentGuidBySessionGuid(Guid sessionGuid, out Guid agentGuid) {
|
||||||
|
return agentGuidsBySessionGuid.TryGetValue(sessionGuid, out agentGuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnSessionClosed(Guid sessionId, Guid agentGuid) {
|
||||||
|
agentGuidsBySessionGuid.TryRemove(new KeyValuePair<Guid, Guid>(sessionId, agentGuid));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -4,6 +4,7 @@ using Phantom.Controller.Services.Agents;
|
|||||||
using Phantom.Controller.Services.Events;
|
using Phantom.Controller.Services.Events;
|
||||||
using Phantom.Controller.Services.Instances;
|
using Phantom.Controller.Services.Instances;
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
using Phantom.Utils.Rpc.Runtime.Server;
|
using Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Rpc;
|
namespace Phantom.Controller.Services.Rpc;
|
||||||
@@ -14,8 +15,17 @@ sealed class AgentClientRegistrar(
|
|||||||
InstanceLogManager instanceLogManager,
|
InstanceLogManager instanceLogManager,
|
||||||
EventLogManager eventLogManager
|
EventLogManager eventLogManager
|
||||||
) : IRpcServerClientRegistrar<IMessageToController, IMessageToAgent> {
|
) : IRpcServerClientRegistrar<IMessageToController, IMessageToAgent> {
|
||||||
public ActorRef<IMessageToController> Register(Guid sessionId, RpcServerToClientConnection<IMessageToController, IMessageToAgent> connection) {
|
public IMessageReceiver<IMessageToController> Register(RpcServerToClientConnection<IMessageToController, IMessageToAgent> connection) {
|
||||||
|
var name = "AgentClient-" + connection.SessionId;
|
||||||
var init = new AgentMessageHandlerActor.Init(connection, agentManager, instanceLogManager, eventLogManager);
|
var init = new AgentMessageHandlerActor.Init(connection, agentManager, instanceLogManager, eventLogManager);
|
||||||
return actorSystem.ActorOf(AgentMessageHandlerActor.Factory(init), "AgentClient-" + sessionId);
|
return new Receiver(connection.SessionId, agentManager, actorSystem.ActorOf(AgentMessageHandlerActor.Factory(init), name));
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class Receiver(Guid sessionId, AgentManager agentManager, ActorRef<IMessageToController> actor) : IMessageReceiver<IMessageToController>.Actor(actor) {
|
||||||
|
public override void OnPing() {
|
||||||
|
if (agentManager.TryGetAgentGuidBySessionGuid(sessionId, out var agentGuid)) {
|
||||||
|
agentManager.TellAgent(agentGuid, new AgentActor.NotifyIsAliveCommand());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -22,7 +22,7 @@ sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
|
|||||||
private readonly InstanceLogManager instanceLogManager;
|
private readonly InstanceLogManager instanceLogManager;
|
||||||
private readonly EventLogManager eventLogManager;
|
private readonly EventLogManager eventLogManager;
|
||||||
|
|
||||||
private Guid? agentGuid;
|
private Guid? registeredAgentGuid;
|
||||||
|
|
||||||
private AgentMessageHandlerActor(Init init) {
|
private AgentMessageHandlerActor(Init init) {
|
||||||
this.connection = init.Connection;
|
this.connection = init.Connection;
|
||||||
@@ -32,7 +32,6 @@ sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
|
|||||||
|
|
||||||
ReceiveAsyncAndReply<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(HandleRegisterAgent);
|
ReceiveAsyncAndReply<RegisterAgentMessage, ImmutableArray<ConfigureInstanceMessage>>(HandleRegisterAgent);
|
||||||
Receive<UnregisterAgentMessage>(HandleUnregisterAgent);
|
Receive<UnregisterAgentMessage>(HandleUnregisterAgent);
|
||||||
Receive<AgentIsAliveMessage>(HandleAgentIsAlive);
|
|
||||||
Receive<AdvertiseJavaRuntimesMessage>(HandleAdvertiseJavaRuntimes);
|
Receive<AdvertiseJavaRuntimesMessage>(HandleAdvertiseJavaRuntimes);
|
||||||
Receive<ReportAgentStatusMessage>(HandleReportAgentStatus);
|
Receive<ReportAgentStatusMessage>(HandleReportAgentStatus);
|
||||||
Receive<ReportInstanceStatusMessage>(HandleReportInstanceStatus);
|
Receive<ReportInstanceStatusMessage>(HandleReportInstanceStatus);
|
||||||
@@ -42,23 +41,21 @@ sealed class AgentMessageHandlerActor : ReceiveActor<IMessageToController> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Guid RequireAgentGuid() {
|
private Guid RequireAgentGuid() {
|
||||||
return agentGuid ?? throw new InvalidOperationException("Agent has not registered yet.");
|
return registeredAgentGuid ?? throw new InvalidOperationException("Agent has not registered yet.");
|
||||||
}
|
}
|
||||||
|
|
||||||
private Task<ImmutableArray<ConfigureInstanceMessage>> HandleRegisterAgent(RegisterAgentMessage message) {
|
private Task<ImmutableArray<ConfigureInstanceMessage>> HandleRegisterAgent(RegisterAgentMessage message) {
|
||||||
agentGuid = message.AgentInfo.AgentGuid;
|
registeredAgentGuid = message.AgentInfo.AgentGuid;
|
||||||
return agentManager.RegisterAgent(message.AgentInfo, connection);
|
return agentManager.RegisterAgent(message.AgentInfo, connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleUnregisterAgent(UnregisterAgentMessage message) {
|
private void HandleUnregisterAgent(UnregisterAgentMessage message) {
|
||||||
agentManager.TellAgent(RequireAgentGuid(), new AgentActor.UnregisterCommand(connection));
|
Guid agentGuid = RequireAgentGuid();
|
||||||
|
agentManager.TellAgent(agentGuid, new AgentActor.UnregisterCommand(connection));
|
||||||
|
agentManager.OnSessionClosed(connection.SessionId, agentGuid);
|
||||||
connection.CloseSession();
|
connection.CloseSession();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleAgentIsAlive(AgentIsAliveMessage message) {
|
|
||||||
agentManager.TellAgent(RequireAgentGuid(), new AgentActor.NotifyIsAliveCommand());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
|
private void HandleAdvertiseJavaRuntimes(AdvertiseJavaRuntimesMessage message) {
|
||||||
agentManager.TellAgent(RequireAgentGuid(), new AgentActor.UpdateJavaRuntimesCommand(message.Runtimes));
|
agentManager.TellAgent(RequireAgentGuid(), new AgentActor.UpdateJavaRuntimesCommand(message.Runtimes));
|
||||||
}
|
}
|
||||||
|
@@ -7,6 +7,7 @@ using Phantom.Controller.Services.Instances;
|
|||||||
using Phantom.Controller.Services.Users;
|
using Phantom.Controller.Services.Users;
|
||||||
using Phantom.Controller.Services.Users.Sessions;
|
using Phantom.Controller.Services.Users.Sessions;
|
||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Actor;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
using Phantom.Utils.Rpc.Runtime.Server;
|
using Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
namespace Phantom.Controller.Services.Rpc;
|
namespace Phantom.Controller.Services.Rpc;
|
||||||
@@ -24,8 +25,9 @@ sealed class WebClientRegistrar(
|
|||||||
MinecraftVersions minecraftVersions,
|
MinecraftVersions minecraftVersions,
|
||||||
EventLogManager eventLogManager
|
EventLogManager eventLogManager
|
||||||
) : IRpcServerClientRegistrar<IMessageToController, IMessageToWeb> {
|
) : IRpcServerClientRegistrar<IMessageToController, IMessageToWeb> {
|
||||||
public ActorRef<IMessageToController> Register(Guid sessionId, RpcServerToClientConnection<IMessageToController, IMessageToWeb> connection) {
|
public IMessageReceiver<IMessageToController> Register(RpcServerToClientConnection<IMessageToController, IMessageToWeb> connection) {
|
||||||
|
var name = "WebClient-" + connection.SessionId;
|
||||||
var init = new WebMessageHandlerActor.Init(connection, this, controllerState, instanceLogManager, userManager, roleManager, userRoleManager, userLoginManager, auditLogManager, agentManager, minecraftVersions, eventLogManager);
|
var init = new WebMessageHandlerActor.Init(connection, this, controllerState, instanceLogManager, userManager, roleManager, userRoleManager, userLoginManager, auditLogManager, agentManager, minecraftVersions, eventLogManager);
|
||||||
return actorSystem.ActorOf(WebMessageHandlerActor.Factory(init), "WebClient-" + sessionId);
|
return new IMessageReceiver<IMessageToController>.Actor(actorSystem.ActorOf(WebMessageHandlerActor.Factory(init), name));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -26,7 +26,7 @@ sealed record Variables(
|
|||||||
|
|
||||||
EndPoint webRpcServerHost = new IPEndPoint(
|
EndPoint webRpcServerHost = new IPEndPoint(
|
||||||
EnvironmentVariables.GetIpAddress("WEB_RPC_SERVER_HOST").WithDefault(IPAddress.Any),
|
EnvironmentVariables.GetIpAddress("WEB_RPC_SERVER_HOST").WithDefault(IPAddress.Any),
|
||||||
EnvironmentVariables.GetPortNumber("WEB_RPC_SERVER_PORT").WithDefault(9401)
|
EnvironmentVariables.GetPortNumber("WEB_RPC_SERVER_PORT").WithDefault(9402)
|
||||||
);
|
);
|
||||||
|
|
||||||
return new Variables(
|
return new Variables(
|
||||||
|
@@ -23,7 +23,6 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Update="BCrypt.Net-Next.StrongName" Version="4.0.3" />
|
<PackageReference Update="BCrypt.Net-Next.StrongName" Version="4.0.3" />
|
||||||
<PackageReference Update="MemoryPack" Version="1.10.0" />
|
<PackageReference Update="MemoryPack" Version="1.10.0" />
|
||||||
<PackageReference Update="NetMQ" Version="4.0.1.13" />
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
@@ -31,27 +31,33 @@ public static class PhantomLogger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T>() {
|
public static ILogger Create<T>() {
|
||||||
return Create(typeof(T).Name);
|
return Create(TypeName<T>());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T>(string name) {
|
public static ILogger Create<T>(string name) {
|
||||||
return Create(ConcatNames(typeof(T).Name, name));
|
return Create(ConcatNames(TypeName<T>(), name));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T>(string name1, string name2) {
|
public static ILogger Create<T>(string name1, string name2) {
|
||||||
return Create(ConcatNames(typeof(T).Name, name1, name2));
|
return Create(ConcatNames(TypeName<T>(), name1, name2));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T1, T2>() {
|
public static ILogger Create<T1, T2>() {
|
||||||
return Create(ConcatNames(typeof(T1).Name, typeof(T2).Name));
|
return Create(ConcatNames(TypeName<T1>(), TypeName<T2>()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T1, T2>(string name) {
|
public static ILogger Create<T1, T2>(string name) {
|
||||||
return Create(ConcatNames(typeof(T1).Name, typeof(T2).Name, name));
|
return Create(ConcatNames(TypeName<T1>(), TypeName<T2>(), name));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ILogger Create<T1, T2>(string name1, string name2) {
|
public static ILogger Create<T1, T2>(string name1, string name2) {
|
||||||
return Create(ConcatNames(typeof(T1).Name, typeof(T2).Name, ConcatNames(name1, name2)));
|
return Create(ConcatNames(TypeName<T1>(), TypeName<T2>(), ConcatNames(name1, name2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string TypeName<T>() {
|
||||||
|
string typeName = typeof(T).Name;
|
||||||
|
int genericsStartIndex = typeName.IndexOf('`');
|
||||||
|
return genericsStartIndex > 0 ? typeName[..genericsStartIndex] : typeName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static string ConcatNames(string name1, string name2) {
|
public static string ConcatNames(string name1, string name2) {
|
||||||
@@ -62,6 +68,11 @@ public static class PhantomLogger {
|
|||||||
return ConcatNames(name1, ConcatNames(name2, name3));
|
return ConcatNames(name1, ConcatNames(name2, name3));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static string ShortenGuid(Guid guid) {
|
||||||
|
var prefix = guid.ToString();
|
||||||
|
return prefix[..prefix.IndexOf('-')];
|
||||||
|
}
|
||||||
|
|
||||||
public static void Dispose() {
|
public static void Dispose() {
|
||||||
Root.Dispose();
|
Root.Dispose();
|
||||||
Base.Dispose();
|
Base.Dispose();
|
||||||
|
@@ -4,11 +4,13 @@ namespace Phantom.Utils.Rpc.Frame;
|
|||||||
|
|
||||||
interface IFrame {
|
interface IFrame {
|
||||||
private const byte TypePingId = 0;
|
private const byte TypePingId = 0;
|
||||||
private const byte TypeMessageId = 1;
|
private const byte TypePongId = 1;
|
||||||
private const byte TypeReplyId = 2;
|
private const byte TypeMessageId = 2;
|
||||||
private const byte TypeErrorId = 3;
|
private const byte TypeReplyId = 3;
|
||||||
|
private const byte TypeErrorId = 4;
|
||||||
|
|
||||||
static readonly ReadOnlyMemory<byte> TypePing = new ([TypePingId]);
|
static readonly ReadOnlyMemory<byte> TypePing = new ([TypePingId]);
|
||||||
|
static readonly ReadOnlyMemory<byte> TypePong = new ([TypePongId]);
|
||||||
static readonly ReadOnlyMemory<byte> TypeMessage = new ([TypeMessageId]);
|
static readonly ReadOnlyMemory<byte> TypeMessage = new ([TypeMessageId]);
|
||||||
static readonly ReadOnlyMemory<byte> TypeReply = new ([TypeReplyId]);
|
static readonly ReadOnlyMemory<byte> TypeReply = new ([TypeReplyId]);
|
||||||
static readonly ReadOnlyMemory<byte> TypeError = new ([TypeErrorId]);
|
static readonly ReadOnlyMemory<byte> TypeError = new ([TypeErrorId]);
|
||||||
@@ -21,6 +23,13 @@ interface IFrame {
|
|||||||
|
|
||||||
switch (oneByteBuffer[0]) {
|
switch (oneByteBuffer[0]) {
|
||||||
case TypePingId:
|
case TypePingId:
|
||||||
|
var pingTime = await PingFrame.Read(stream, cancellationToken);
|
||||||
|
await reader.OnPing(pingTime, cancellationToken);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TypePongId:
|
||||||
|
var pongFrame = await PongFrame.Read(stream, cancellationToken);
|
||||||
|
reader.OnPongFrame(pongFrame);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TypeMessageId:
|
case TypeMessageId:
|
||||||
|
@@ -3,6 +3,8 @@
|
|||||||
namespace Phantom.Utils.Rpc.Frame;
|
namespace Phantom.Utils.Rpc.Frame;
|
||||||
|
|
||||||
interface IFrameReader {
|
interface IFrameReader {
|
||||||
|
ValueTask OnPing(DateTimeOffset pingTime, CancellationToken cancellationToken);
|
||||||
|
void OnPongFrame(PongFrame frame);
|
||||||
Task OnMessageFrame(MessageFrame frame, CancellationToken cancellationToken);
|
Task OnMessageFrame(MessageFrame frame, CancellationToken cancellationToken);
|
||||||
void OnReplyFrame(ReplyFrame frame);
|
void OnReplyFrame(ReplyFrame frame);
|
||||||
void OnErrorFrame(ErrorFrame frame);
|
void OnErrorFrame(ErrorFrame frame);
|
||||||
|
@@ -1,11 +1,15 @@
|
|||||||
namespace Phantom.Utils.Rpc.Frame.Types;
|
namespace Phantom.Utils.Rpc.Frame.Types;
|
||||||
|
|
||||||
sealed record PingFrame : IFrame {
|
sealed record PingFrame : IFrame {
|
||||||
public static PingFrame Instance { get; } = new PingFrame();
|
public static PingFrame Instance { get; } = new ();
|
||||||
|
|
||||||
public ReadOnlyMemory<byte> FrameType => IFrame.TypePing;
|
public ReadOnlyMemory<byte> FrameType => IFrame.TypePing;
|
||||||
|
|
||||||
public Task Write(Stream stream, CancellationToken cancellationToken) {
|
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
||||||
return Task.CompletedTask;
|
await Serialization.WriteSignedLong(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), stream, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static async Task<DateTimeOffset> Read(Stream stream, CancellationToken cancellationToken) {
|
||||||
|
return DateTimeOffset.FromUnixTimeMilliseconds(await Serialization.ReadSignedLong(stream, cancellationToken));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
13
Utils/Phantom.Utils.Rpc/Frame/Types/PongFrame.cs
Normal file
13
Utils/Phantom.Utils.Rpc/Frame/Types/PongFrame.cs
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
namespace Phantom.Utils.Rpc.Frame.Types;
|
||||||
|
|
||||||
|
sealed record PongFrame(DateTimeOffset PingTime) : IFrame {
|
||||||
|
public ReadOnlyMemory<byte> FrameType => IFrame.TypePong;
|
||||||
|
|
||||||
|
public async Task Write(Stream stream, CancellationToken cancellationToken) {
|
||||||
|
await Serialization.WriteSignedLong(PingTime.ToUnixTimeMilliseconds(), stream, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static async Task<PongFrame> Read(Stream stream, CancellationToken cancellationToken) {
|
||||||
|
return new PongFrame(DateTimeOffset.FromUnixTimeMilliseconds(await Serialization.ReadSignedLong(stream, cancellationToken)));
|
||||||
|
}
|
||||||
|
}
|
5
Utils/Phantom.Utils.Rpc/IRpcListener.cs
Normal file
5
Utils/Phantom.Utils.Rpc/IRpcListener.cs
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
namespace Phantom.Utils.Rpc;
|
||||||
|
|
||||||
|
public interface IRpcListener {
|
||||||
|
|
||||||
|
}
|
8
Utils/Phantom.Utils.Rpc/IRpcReplySender.cs
Normal file
8
Utils/Phantom.Utils.Rpc/IRpcReplySender.cs
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc;
|
||||||
|
|
||||||
|
interface IRpcReplySender {
|
||||||
|
ValueTask SendReply<TReply>(uint replyingToMessageId, TReply reply, CancellationToken cancellationToken);
|
||||||
|
ValueTask SendError(uint replyingToMessageId, RpcError error, CancellationToken cancellationToken);
|
||||||
|
}
|
21
Utils/Phantom.Utils.Rpc/Message/IMessageReceiver.cs
Normal file
21
Utils/Phantom.Utils.Rpc/Message/IMessageReceiver.cs
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
using Phantom.Utils.Actor;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
public interface IMessageReceiver<TMessageBase> {
|
||||||
|
void OnPing();
|
||||||
|
void OnMessage(TMessageBase message);
|
||||||
|
Task<TReply> OnMessage<TMessage, TReply>(TMessage message, CancellationToken cancellationToken = default) where TMessage : TMessageBase, ICanReply<TReply>;
|
||||||
|
|
||||||
|
class Actor(ActorRef<TMessageBase> actor) : IMessageReceiver<TMessageBase> {
|
||||||
|
public virtual void OnPing() {}
|
||||||
|
|
||||||
|
public void OnMessage(TMessageBase message) {
|
||||||
|
actor.Tell(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<TReply> OnMessage<TMessage, TReply>(TMessage message, CancellationToken cancellationToken = default) where TMessage : TMessageBase, ICanReply<TReply> {
|
||||||
|
return actor.Request(message, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -1,11 +0,0 @@
|
|||||||
using Phantom.Utils.Actor;
|
|
||||||
using Phantom.Utils.Rpc.Runtime;
|
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Message;
|
|
||||||
|
|
||||||
interface MessageHandler<TMessageBase> {
|
|
||||||
ActorRef<TMessageBase> Actor { get; }
|
|
||||||
|
|
||||||
ValueTask OnReply<TMessage, TReply>(uint messageId, TReply reply, CancellationToken cancellationToken) where TMessage : TMessageBase, ICanReply<TReply>;
|
|
||||||
ValueTask OnError(uint messageId, RpcError error, CancellationToken cancellationToken);
|
|
||||||
}
|
|
@@ -9,7 +9,7 @@ namespace Phantom.Utils.Rpc.Message;
|
|||||||
public sealed class MessageRegistry<TMessageBase>(ILogger logger) {
|
public sealed class MessageRegistry<TMessageBase>(ILogger logger) {
|
||||||
private readonly Dictionary<Type, ushort> typeToCodeMapping = new ();
|
private readonly Dictionary<Type, ushort> typeToCodeMapping = new ();
|
||||||
private readonly Dictionary<ushort, Type> codeToTypeMapping = new ();
|
private readonly Dictionary<ushort, Type> codeToTypeMapping = new ();
|
||||||
private readonly Dictionary<ushort, Func<uint, ReadOnlyMemory<byte>, MessageHandler<TMessageBase>, CancellationToken, Task>> codeToHandlerMapping = new ();
|
private readonly Dictionary<ushort, Func<uint, ReadOnlyMemory<byte>, RpcMessageHandler<TMessageBase>, CancellationToken, Task>> codeToHandlerMapping = new ();
|
||||||
|
|
||||||
public void Add<TMessage>(ushort code) where TMessage : TMessageBase {
|
public void Add<TMessage>(ushort code) where TMessage : TMessageBase {
|
||||||
if (HasReplyType(typeof(TMessage))) {
|
if (HasReplyType(typeof(TMessage))) {
|
||||||
@@ -50,7 +50,7 @@ public sealed class MessageRegistry<TMessageBase>(ILogger logger) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internal async Task Handle(MessageFrame frame, MessageHandler<TMessageBase> handler, CancellationToken cancellationToken) {
|
internal async Task Handle(MessageFrame frame, RpcMessageHandler<TMessageBase> handler, CancellationToken cancellationToken) {
|
||||||
uint messageId = frame.MessageId;
|
uint messageId = frame.MessageId;
|
||||||
|
|
||||||
if (codeToHandlerMapping.TryGetValue(frame.RegistryCode, out var action)) {
|
if (codeToHandlerMapping.TryGetValue(frame.RegistryCode, out var action)) {
|
||||||
@@ -58,47 +58,47 @@ public sealed class MessageRegistry<TMessageBase>(ILogger logger) {
|
|||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
logger.Error("Unknown message code {Code} for message {MessageId}.", frame.RegistryCode, messageId);
|
logger.Error("Unknown message code {Code} for message {MessageId}.", frame.RegistryCode, messageId);
|
||||||
await handler.OnError(messageId, RpcError.UnknownMessageRegistryCode, cancellationToken);
|
await handler.SendError(messageId, RpcError.UnknownMessageRegistryCode, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task DeserializationHandler<TMessage>(uint messageId, ReadOnlyMemory<byte> serializedMessage, MessageHandler<TMessageBase> handler, CancellationToken cancellationToken) where TMessage : TMessageBase {
|
private async Task DeserializationHandler<TMessage>(uint messageId, ReadOnlyMemory<byte> serializedMessage, RpcMessageHandler<TMessageBase> handler, CancellationToken cancellationToken) where TMessage : TMessageBase {
|
||||||
TMessage message;
|
TMessage message;
|
||||||
try {
|
try {
|
||||||
message = Serialization.Deserialize<TMessage>(serializedMessage);
|
message = Serialization.Deserialize<TMessage>(serializedMessage);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.Error(e, "Could not deserialize message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
logger.Error(e, "Could not deserialize message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
||||||
await handler.OnError(messageId, RpcError.MessageDeserializationError, cancellationToken);
|
await handler.SendError(messageId, RpcError.MessageDeserializationError, cancellationToken);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.Actor.Tell(message);
|
handler.Receiver.OnMessage(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task DeserializationHandler<TMessage, TReply>(uint messageId, ReadOnlyMemory<byte> serializedMessage, MessageHandler<TMessageBase> handler, CancellationToken cancellationToken) where TMessage : TMessageBase, ICanReply<TReply> {
|
private async Task DeserializationHandler<TMessage, TReply>(uint messageId, ReadOnlyMemory<byte> serializedMessage, RpcMessageHandler<TMessageBase> handler, CancellationToken cancellationToken) where TMessage : TMessageBase, ICanReply<TReply> {
|
||||||
TMessage message;
|
TMessage message;
|
||||||
try {
|
try {
|
||||||
message = Serialization.Deserialize<TMessage>(serializedMessage);
|
message = Serialization.Deserialize<TMessage>(serializedMessage);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.Error(e, "Could not deserialize message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
logger.Error(e, "Could not deserialize message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
||||||
await handler.OnError(messageId, RpcError.MessageDeserializationError, cancellationToken);
|
await handler.SendError(messageId, RpcError.MessageDeserializationError, cancellationToken);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
TReply reply;
|
TReply reply;
|
||||||
try {
|
try {
|
||||||
reply = await handler.Actor.Request(message, cancellationToken);
|
reply = await handler.Receiver.OnMessage<TMessage, TReply>(message, cancellationToken);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.Error(e, "Could not handle message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
logger.Error(e, "Could not handle message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
||||||
await handler.OnError(messageId, RpcError.MessageHandlingError, cancellationToken);
|
await handler.SendError(messageId, RpcError.MessageHandlingError, cancellationToken);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await handler.OnReply<TMessage, TReply>(messageId, reply, cancellationToken);
|
await handler.SendReply(messageId, reply, cancellationToken);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.Error(e, "Could not reply to message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
logger.Error(e, "Could not reply to message {MessageId} ({MessageType}).", messageId, typeof(TMessage).Name);
|
||||||
await handler.OnError(messageId, RpcError.MessageHandlingError, cancellationToken);
|
await handler.SendError(messageId, RpcError.MessageHandlingError, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -8,7 +8,7 @@ namespace Phantom.Utils.Rpc.Message;
|
|||||||
|
|
||||||
sealed class MessageReplyTracker {
|
sealed class MessageReplyTracker {
|
||||||
private readonly ILogger logger;
|
private readonly ILogger logger;
|
||||||
private readonly ConcurrentDictionary<uint, TaskCompletionSource<ReadOnlyMemory<byte>>> replyTasks = new (concurrencyLevel: 4, capacity: 16);
|
private readonly ConcurrentDictionary<uint, TaskCompletionSource<ReadOnlyMemory<byte>>> replyTasks = new (concurrencyLevel: 2, capacity: 16);
|
||||||
|
|
||||||
internal MessageReplyTracker(string loggerName) {
|
internal MessageReplyTracker(string loggerName) {
|
||||||
this.logger = PhantomLogger.Create<MessageReplyTracker>(loggerName);
|
this.logger = PhantomLogger.Create<MessageReplyTracker>(loggerName);
|
||||||
|
@@ -7,7 +7,6 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="MemoryPack" />
|
<PackageReference Include="MemoryPack" />
|
||||||
<PackageReference Include="NetMQ" />
|
|
||||||
<PackageReference Include="Serilog" />
|
<PackageReference Include="Serilog" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
@@ -1,6 +1,4 @@
|
|||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Logging;
|
|
||||||
using Phantom.Utils.Rpc.Frame;
|
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
@@ -29,31 +27,16 @@ public sealed class RpcClient<TClientToServerMessage, TServerToClientMessage> :
|
|||||||
this.SendChannel = new RpcSendChannel<TClientToServerMessage>(loggerName, connectionParameters.Common, this.connection, messageDefinitions.ToServer);
|
this.SendChannel = new RpcSendChannel<TClientToServerMessage>(loggerName, connectionParameters.Common, this.connection, messageDefinitions.ToServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Listen(ActorRef<TServerToClientMessage> actor) {
|
public async Task Listen(IMessageReceiver<TServerToClientMessage> receiver) {
|
||||||
var handler = new MessageHandlerImpl(SendChannel, actor);
|
var messageHandler = new RpcMessageHandler<TServerToClientMessage>(receiver, SendChannel);
|
||||||
|
var frameReader = new RpcFrameReader<TClientToServerMessage, TServerToClientMessage>(loggerName, serverToClientMessageRegistry, messageHandler, SendChannel);
|
||||||
try {
|
try {
|
||||||
await connection.ReadConnection(stream => Receive(stream, handler));
|
await connection.ReadConnection(frameReader, CancellationToken.None);
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
// Ignore.
|
// Ignore.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task Receive(Stream stream, MessageHandlerImpl handler) {
|
|
||||||
await IFrame.ReadFrom(stream, new RpcFrameReader<TClientToServerMessage, TServerToClientMessage>(loggerName, serverToClientMessageRegistry, handler, SendChannel), CancellationToken.None);
|
|
||||||
}
|
|
||||||
|
|
||||||
private sealed class MessageHandlerImpl(RpcSendChannel<TClientToServerMessage> sendChannel, ActorRef<TServerToClientMessage> actor) : MessageHandler<TServerToClientMessage> {
|
|
||||||
public ActorRef<TServerToClientMessage> Actor => actor;
|
|
||||||
|
|
||||||
public ValueTask OnReply<TMessage, TReply>(uint messageId, TReply reply, CancellationToken cancellationToken) where TMessage : TServerToClientMessage, ICanReply<TReply> {
|
|
||||||
return sendChannel.SendReply(messageId, reply, cancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ValueTask OnError(uint messageId, RpcError error, CancellationToken cancellationToken) {
|
|
||||||
return sendChannel.SendError(messageId, error, cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task Shutdown() {
|
public async Task Shutdown() {
|
||||||
logger.Information("Shutting down client...");
|
logger.Information("Shutting down client...");
|
||||||
|
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Frame;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Runtime.Client;
|
namespace Phantom.Utils.Rpc.Runtime.Client;
|
||||||
@@ -30,7 +31,7 @@ sealed class RpcClientToServerConnection(string loggerName, RpcClientToServerCon
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task ReadConnection(Func<Stream, Task> reader) {
|
public async Task ReadConnection(IFrameReader frameReader, CancellationToken cancellationToken) {
|
||||||
RpcClientToServerConnector.Connection? connection = null;
|
RpcClientToServerConnector.Connection? connection = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -48,7 +49,7 @@ sealed class RpcClientToServerConnection(string loggerName, RpcClientToServerCon
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await reader(connection.Stream);
|
await IFrame.ReadFrom(connection.Stream, frameReader, cancellationToken);
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
throw;
|
throw;
|
||||||
} catch (EndOfStreamException) {
|
} catch (EndOfStreamException) {
|
||||||
|
@@ -6,9 +6,23 @@ using Serilog;
|
|||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Runtime;
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
sealed class RpcFrameReader<TSentMessage, TReceivedMessage>(string loggerName, MessageRegistry<TReceivedMessage> messageRegistry, MessageHandler<TReceivedMessage> messageHandler, RpcSendChannel<TSentMessage> sendChannel) : IFrameReader {
|
sealed class RpcFrameReader<TSentMessage, TReceivedMessage>(
|
||||||
|
string loggerName,
|
||||||
|
MessageRegistry<TReceivedMessage> messageRegistry,
|
||||||
|
RpcMessageHandler<TReceivedMessage> messageHandler,
|
||||||
|
RpcSendChannel<TSentMessage> sendChannel
|
||||||
|
) : IFrameReader {
|
||||||
private readonly ILogger logger = PhantomLogger.Create<RpcFrameReader<TSentMessage, TReceivedMessage>>(loggerName);
|
private readonly ILogger logger = PhantomLogger.Create<RpcFrameReader<TSentMessage, TReceivedMessage>>(loggerName);
|
||||||
|
|
||||||
|
public ValueTask OnPing(DateTimeOffset pingTime, CancellationToken cancellationToken) {
|
||||||
|
messageHandler.OnPing();
|
||||||
|
return sendChannel.SendPong(pingTime, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void OnPongFrame(PongFrame frame) {
|
||||||
|
sendChannel.ReceivePong(frame);
|
||||||
|
}
|
||||||
|
|
||||||
public Task OnMessageFrame(MessageFrame frame, CancellationToken cancellationToken) {
|
public Task OnMessageFrame(MessageFrame frame, CancellationToken cancellationToken) {
|
||||||
if (messageRegistry.TryGetType(frame, out var messageType)) {
|
if (messageRegistry.TryGetType(frame, out var messageType)) {
|
||||||
logger.Verbose("Received message {MesageId} of type {MessageType} ({Bytes} B).", frame.MessageId, messageType.Name, frame.SerializedMessage.Length);
|
logger.Verbose("Received message {MesageId} of type {MessageType} ({Bytes} B).", frame.MessageId, messageType.Name, frame.SerializedMessage.Length);
|
||||||
|
19
Utils/Phantom.Utils.Rpc/Runtime/RpcMessageHandler.cs
Normal file
19
Utils/Phantom.Utils.Rpc/Runtime/RpcMessageHandler.cs
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
sealed class RpcMessageHandler<TMessageBase>(IMessageReceiver<TMessageBase> receiver, IRpcReplySender replySender) {
|
||||||
|
public IMessageReceiver<TMessageBase> Receiver => receiver;
|
||||||
|
|
||||||
|
public void OnPing() {
|
||||||
|
receiver.OnPing();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask SendReply<TReply>(uint messageId, TReply reply, CancellationToken cancellationToken) {
|
||||||
|
return replySender.SendReply(messageId, reply, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask SendError(uint messageId, RpcError error, CancellationToken cancellationToken) {
|
||||||
|
return replySender.SendError(messageId, error, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
@@ -9,7 +9,7 @@ using Serilog;
|
|||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Runtime;
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
public sealed class RpcSendChannel<TMessageBase> : IRpcReplySender, IDisposable {
|
||||||
private readonly ILogger logger;
|
private readonly ILogger logger;
|
||||||
private readonly IRpcConnectionProvider connectionProvider;
|
private readonly IRpcConnectionProvider connectionProvider;
|
||||||
private readonly MessageRegistry<TMessageBase> messageRegistry;
|
private readonly MessageRegistry<TMessageBase> messageRegistry;
|
||||||
@@ -22,6 +22,7 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
|||||||
private readonly CancellationTokenSource pingCancellationTokenSource = new ();
|
private readonly CancellationTokenSource pingCancellationTokenSource = new ();
|
||||||
|
|
||||||
private uint nextMessageId;
|
private uint nextMessageId;
|
||||||
|
private TaskCompletionSource<DateTimeOffset>? pongTask;
|
||||||
|
|
||||||
internal RpcSendChannel(string loggerName, RpcCommonConnectionParameters connectionParameters, IRpcConnectionProvider connectionProvider, MessageRegistry<TMessageBase> messageRegistry) {
|
internal RpcSendChannel(string loggerName, RpcCommonConnectionParameters connectionParameters, IRpcConnectionProvider connectionProvider, MessageRegistry<TMessageBase> messageRegistry) {
|
||||||
this.logger = PhantomLogger.Create<RpcSendChannel<TMessageBase>>(loggerName);
|
this.logger = PhantomLogger.Create<RpcSendChannel<TMessageBase>>(loggerName);
|
||||||
@@ -40,6 +41,10 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
|||||||
this.pingTask = Ping(connectionParameters.PingInterval);
|
this.pingTask = Ping(connectionParameters.PingInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
internal async ValueTask SendPong(DateTimeOffset pingTime, CancellationToken cancellationToken) {
|
||||||
|
await SendFrame(new PongFrame(pingTime), cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
public bool TrySendMessage<TMessage>(TMessage message) where TMessage : TMessageBase {
|
public bool TrySendMessage<TMessage>(TMessage message) where TMessage : TMessageBase {
|
||||||
return sendQueue.Writer.TryWrite(NextMessageFrame(message));
|
return sendQueue.Writer.TryWrite(NextMessageFrame(message));
|
||||||
}
|
}
|
||||||
@@ -63,11 +68,11 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
|||||||
return await messageReplyTracker.WaitForReply<TReply>(messageId, waitForReplyTime, cancellationToken);
|
return await messageReplyTracker.WaitForReply<TReply>(messageId, waitForReplyTime, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
internal async ValueTask SendReply<TReply>(uint replyingToMessageId, TReply reply, CancellationToken cancellationToken) {
|
async ValueTask IRpcReplySender.SendReply<TReply>(uint replyingToMessageId, TReply reply, CancellationToken cancellationToken) {
|
||||||
await SendFrame(new ReplyFrame(replyingToMessageId, Serialization.Serialize(reply)), cancellationToken);
|
await SendFrame(new ReplyFrame(replyingToMessageId, Serialization.Serialize(reply)), cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
internal async ValueTask SendError(uint replyingToMessageId, RpcError error, CancellationToken cancellationToken) {
|
async ValueTask IRpcReplySender.SendError(uint replyingToMessageId, RpcError error, CancellationToken cancellationToken) {
|
||||||
await SendFrame(new ErrorFrame(replyingToMessageId, error), cancellationToken);
|
await SendFrame(new ErrorFrame(replyingToMessageId, error), cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -85,18 +90,17 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
|||||||
private async Task ProcessSendQueue() {
|
private async Task ProcessSendQueue() {
|
||||||
await foreach (IFrame frame in sendQueue.Reader.ReadAllAsync()) {
|
await foreach (IFrame frame in sendQueue.Reader.ReadAllAsync()) {
|
||||||
while (true) {
|
while (true) {
|
||||||
Stream stream;
|
|
||||||
try {
|
try {
|
||||||
stream = await connectionProvider.GetStream();
|
Stream stream = await connectionProvider.GetStream();
|
||||||
|
await stream.WriteAsync(frame.FrameType);
|
||||||
|
await frame.Write(stream);
|
||||||
|
await stream.FlushAsync();
|
||||||
|
break;
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
throw;
|
throw;
|
||||||
} catch (Exception) {
|
} catch (Exception) {
|
||||||
continue;
|
// Retry.
|
||||||
}
|
}
|
||||||
|
|
||||||
await stream.WriteAsync(frame.FrameType);
|
|
||||||
await frame.Write(stream);
|
|
||||||
await stream.FlushAsync();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -107,14 +111,26 @@ public sealed class RpcSendChannel<TMessageBase> : IDisposable {
|
|||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
await Task.Delay(interval, cancellationToken);
|
await Task.Delay(interval, cancellationToken);
|
||||||
// TODO wait for pong?
|
|
||||||
|
pongTask = new TaskCompletionSource<DateTimeOffset>();
|
||||||
|
|
||||||
if (!sendQueue.Writer.TryWrite(PingFrame.Instance)) {
|
if (!sendQueue.Writer.TryWrite(PingFrame.Instance)) {
|
||||||
logger.Warning("Skipped a ping due to a full queue.");
|
logger.Warning("Skipped a ping due to a full queue.");
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DateTimeOffset pingTime = await pongTask.Task;
|
||||||
|
DateTimeOffset currentTime = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
TimeSpan roundTripTime = currentTime - pingTime;
|
||||||
|
logger.Information("Received pong (rtt: {RoundTripTime} ms).", (long) roundTripTime.TotalMilliseconds);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
internal void ReceivePong(PongFrame frame) {
|
||||||
|
pongTask?.TrySetResult(frame.PingTime);
|
||||||
|
}
|
||||||
|
|
||||||
internal void ReceiveReply(ReplyFrame frame) {
|
internal void ReceiveReply(ReplyFrame frame) {
|
||||||
messageReplyTracker.ReceiveReply(frame.ReplyingToMessageId, frame.SerializedReply);
|
messageReplyTracker.ReceiveReply(frame.ReplyingToMessageId, frame.SerializedReply);
|
||||||
}
|
}
|
||||||
|
@@ -1,7 +1,7 @@
|
|||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Runtime.Server;
|
namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||||
|
|
||||||
public interface IRpcServerClientRegistrar<TClientToServerMessage, TServerToClientMessage> {
|
public interface IRpcServerClientRegistrar<TClientToServerMessage, TServerToClientMessage> {
|
||||||
ActorRef<TClientToServerMessage> Register(Guid sessionId, RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage> connection);
|
IMessageReceiver<TClientToServerMessage> Register(RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage> connection);
|
||||||
}
|
}
|
||||||
|
@@ -18,7 +18,6 @@ public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
|||||||
) {
|
) {
|
||||||
private readonly ILogger logger = PhantomLogger.Create<RpcServer<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
private readonly ILogger logger = PhantomLogger.Create<RpcServer<TClientToServerMessage, TServerToClientMessage>>(loggerName);
|
||||||
private readonly RpcServerClientSessions<TServerToClientMessage> clientSessions = new (loggerName, connectionParameters.Common, messageDefinitions.ToClient);
|
private readonly RpcServerClientSessions<TServerToClientMessage> clientSessions = new (loggerName, connectionParameters.Common, messageDefinitions.ToClient);
|
||||||
|
|
||||||
private readonly List<Client> clients = [];
|
private readonly List<Client> clients = [];
|
||||||
|
|
||||||
public async Task<bool> Run(CancellationToken shutdownToken) {
|
public async Task<bool> Run(CancellationToken shutdownToken) {
|
||||||
@@ -87,14 +86,25 @@ public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private sealed class Client {
|
private sealed class Client {
|
||||||
private static readonly TimeSpan DisconnectTimeout = TimeSpan.FromSeconds(10);
|
private static TimeSpan DisconnectTimeout => TimeSpan.FromSeconds(10);
|
||||||
|
|
||||||
|
private static string GetAddressDescriptor(Socket socket) {
|
||||||
|
EndPoint? endPoint = socket.RemoteEndPoint;
|
||||||
|
|
||||||
|
return endPoint switch {
|
||||||
|
IPEndPoint ip => ip.Port.ToString(),
|
||||||
|
null => "{unknown}",
|
||||||
|
_ => "{" + endPoint + "}",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
public Task Task { get; }
|
public Task Task { get; }
|
||||||
|
|
||||||
private string Address => socket.RemoteEndPoint?.ToString() ?? "<unknown address>";
|
private string Address => socket.RemoteEndPoint?.ToString() ?? "<unknown address>";
|
||||||
|
|
||||||
private readonly string loggerName;
|
|
||||||
private readonly ILogger logger;
|
private readonly ILogger logger;
|
||||||
|
private readonly string serverLoggerName;
|
||||||
|
|
||||||
private readonly IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions;
|
private readonly IMessageDefinitions<TClientToServerMessage, TServerToClientMessage> messageDefinitions;
|
||||||
private readonly IRpcServerClientRegistrar<TClientToServerMessage, TServerToClientMessage> clientRegistrar;
|
private readonly IRpcServerClientRegistrar<TClientToServerMessage, TServerToClientMessage> clientRegistrar;
|
||||||
private readonly RpcServerClientSessions<TServerToClientMessage> clientSessions;
|
private readonly RpcServerClientSessions<TServerToClientMessage> clientSessions;
|
||||||
@@ -113,8 +123,9 @@ public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
|||||||
AuthToken authToken,
|
AuthToken authToken,
|
||||||
CancellationToken shutdownToken
|
CancellationToken shutdownToken
|
||||||
) {
|
) {
|
||||||
this.loggerName = PhantomLogger.ConcatNames(serverLoggerName, Address);
|
this.logger = PhantomLogger.Create<RpcServer<TClientToServerMessage, TServerToClientMessage>, Client>(PhantomLogger.ConcatNames(serverLoggerName, GetAddressDescriptor(socket)));
|
||||||
this.logger = PhantomLogger.Create<RpcServer<TClientToServerMessage, TServerToClientMessage>, Client>(loggerName);
|
this.serverLoggerName = serverLoggerName;
|
||||||
|
|
||||||
this.messageDefinitions = messageDefinitions;
|
this.messageDefinitions = messageDefinitions;
|
||||||
this.clientRegistrar = clientRegistrar;
|
this.clientRegistrar = clientRegistrar;
|
||||||
this.clientSessions = clientSessions;
|
this.clientSessions = clientSessions;
|
||||||
@@ -122,6 +133,7 @@ public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
|||||||
this.sslOptions = sslOptions;
|
this.sslOptions = sslOptions;
|
||||||
this.authToken = authToken;
|
this.authToken = authToken;
|
||||||
this.shutdownToken = shutdownToken;
|
this.shutdownToken = shutdownToken;
|
||||||
|
|
||||||
this.Task = Run();
|
this.Task = Run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,19 +150,9 @@ public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!sessionIdResult.HasValue) {
|
if (sessionIdResult.HasValue) {
|
||||||
return;
|
logger.Information("Client connected.");
|
||||||
}
|
await RunConnectedSession(sessionIdResult.Value, stream);
|
||||||
|
|
||||||
Guid sessionId = sessionIdResult.Value;
|
|
||||||
logger.Information("Client connected.");
|
|
||||||
|
|
||||||
var session = clientSessions.OnConnected(sessionId, stream);
|
|
||||||
try {
|
|
||||||
var connection = new RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage>(loggerName, clientSessions, sessionId, stream, session, messageDefinitions.ToServer);
|
|
||||||
await connection.Listen(clientRegistrar.Register(sessionId, connection));
|
|
||||||
} finally {
|
|
||||||
clientSessions.OnDisconnected(sessionId);
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
logger.Information("Disconnecting client...");
|
logger.Information("Disconnecting client...");
|
||||||
@@ -215,5 +217,16 @@ public sealed class RpcServer<TClientToServerMessage, TServerToClientMessage>(
|
|||||||
var sessionId = await Serialization.ReadGuid(stream, cancellationToken);
|
var sessionId = await Serialization.ReadGuid(stream, cancellationToken);
|
||||||
return Either.Left(sessionId);
|
return Either.Left(sessionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async Task RunConnectedSession(Guid sessionId, Stream stream) {
|
||||||
|
var loggerName = PhantomLogger.ConcatNames(serverLoggerName, clientSessions.NextLoggerName(sessionId));
|
||||||
|
var session = clientSessions.OnConnected(sessionId, stream);
|
||||||
|
try {
|
||||||
|
var connection = new RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage>(loggerName, clientSessions, sessionId, messageDefinitions.ToServer, stream, session);
|
||||||
|
await connection.Listen(clientRegistrar.Register(connection));
|
||||||
|
} finally {
|
||||||
|
clientSessions.OnDisconnected(sessionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -6,8 +6,14 @@ namespace Phantom.Utils.Rpc.Runtime.Server;
|
|||||||
|
|
||||||
sealed class RpcServerClientSessions<TServerToClientMessage>(string loggerName, RpcCommonConnectionParameters connectionParameters, MessageRegistry<TServerToClientMessage> messageRegistry) {
|
sealed class RpcServerClientSessions<TServerToClientMessage>(string loggerName, RpcCommonConnectionParameters connectionParameters, MessageRegistry<TServerToClientMessage> messageRegistry) {
|
||||||
private readonly ConcurrentDictionary<Guid, RpcServerClientSession<TServerToClientMessage>> sessionsById = new ();
|
private readonly ConcurrentDictionary<Guid, RpcServerClientSession<TServerToClientMessage>> sessionsById = new ();
|
||||||
|
private readonly ConcurrentDictionary<Guid, uint> sessionLoggerSequenceIds = new ();
|
||||||
|
|
||||||
internal RpcSendChannel<TServerToClientMessage> OnConnected(Guid sessionId, Stream stream) {
|
public string NextLoggerName(Guid sessionId) {
|
||||||
|
string name = PhantomLogger.ShortenGuid(sessionId);
|
||||||
|
return name + "/" + sessionLoggerSequenceIds.AddOrUpdate(sessionId, static _ => 1, static (_, prev) => prev + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RpcSendChannel<TServerToClientMessage> OnConnected(Guid sessionId, Stream stream) {
|
||||||
var session = sessionsById.GetOrAdd(sessionId, CreateSession);
|
var session = sessionsById.GetOrAdd(sessionId, CreateSession);
|
||||||
session.OnConnected(stream);
|
session.OnConnected(stream);
|
||||||
return session.SendChannel;
|
return session.SendChannel;
|
||||||
|
@@ -1,6 +1,4 @@
|
|||||||
using Phantom.Utils.Actor;
|
using Phantom.Utils.Rpc.Frame;
|
||||||
using Phantom.Utils.Logging;
|
|
||||||
using Phantom.Utils.Rpc.Frame;
|
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
namespace Phantom.Utils.Rpc.Runtime.Server;
|
namespace Phantom.Utils.Rpc.Runtime.Server;
|
||||||
@@ -8,47 +6,33 @@ namespace Phantom.Utils.Rpc.Runtime.Server;
|
|||||||
public sealed class RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage> {
|
public sealed class RpcServerToClientConnection<TClientToServerMessage, TServerToClientMessage> {
|
||||||
private readonly string loggerName;
|
private readonly string loggerName;
|
||||||
private readonly RpcServerClientSessions<TServerToClientMessage> sessions;
|
private readonly RpcServerClientSessions<TServerToClientMessage> sessions;
|
||||||
private readonly Guid sessionId;
|
|
||||||
private readonly Stream stream;
|
|
||||||
private readonly MessageRegistry<TClientToServerMessage> messageRegistry;
|
private readonly MessageRegistry<TClientToServerMessage> messageRegistry;
|
||||||
|
private readonly Stream stream;
|
||||||
|
|
||||||
|
public Guid SessionId { get; }
|
||||||
public RpcSendChannel<TServerToClientMessage> SendChannel { get; }
|
public RpcSendChannel<TServerToClientMessage> SendChannel { get; }
|
||||||
|
|
||||||
internal RpcServerToClientConnection(string parentLoggerName, RpcServerClientSessions<TServerToClientMessage> sessions, Guid sessionId, Stream stream, RpcSendChannel<TServerToClientMessage> sendChannel, MessageRegistry<TClientToServerMessage> messageRegistry) {
|
internal RpcServerToClientConnection(string loggerName, RpcServerClientSessions<TServerToClientMessage> sessions, Guid sessionId, MessageRegistry<TClientToServerMessage> messageRegistry, Stream stream, RpcSendChannel<TServerToClientMessage> sendChannel) {
|
||||||
this.loggerName = PhantomLogger.ConcatNames(parentLoggerName, sessionId.ToString());
|
this.loggerName = loggerName;
|
||||||
this.sessions = sessions;
|
this.sessions = sessions;
|
||||||
this.sessionId = sessionId;
|
|
||||||
this.stream = stream;
|
|
||||||
this.messageRegistry = messageRegistry;
|
this.messageRegistry = messageRegistry;
|
||||||
|
this.stream = stream;
|
||||||
|
|
||||||
|
this.SessionId = sessionId;
|
||||||
this.SendChannel = sendChannel;
|
this.SendChannel = sendChannel;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task CloseSession() {
|
public Task CloseSession() {
|
||||||
return sessions.CloseSession(sessionId);
|
return sessions.CloseSession(SessionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
internal async Task Listen(ActorRef<TClientToServerMessage> actor) {
|
internal async Task Listen(IMessageReceiver<TClientToServerMessage> receiver) {
|
||||||
var handler = new MessageHandlerImpl(SendChannel, actor);
|
var messageHandler = new RpcMessageHandler<TClientToServerMessage>(receiver, SendChannel);
|
||||||
|
var frameReader = new RpcFrameReader<TServerToClientMessage, TClientToServerMessage>(loggerName, messageRegistry, messageHandler, SendChannel);
|
||||||
try {
|
try {
|
||||||
await Receive(stream, handler);
|
await IFrame.ReadFrom(stream, frameReader, CancellationToken.None);
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
// Ignore.
|
// Ignore.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task Receive(Stream stream, MessageHandlerImpl handler) {
|
|
||||||
await IFrame.ReadFrom(stream, new RpcFrameReader<TServerToClientMessage, TClientToServerMessage>(loggerName, messageRegistry, handler, SendChannel), CancellationToken.None);
|
|
||||||
}
|
|
||||||
|
|
||||||
private sealed class MessageHandlerImpl(RpcSendChannel<TServerToClientMessage> sendChannel, ActorRef<TClientToServerMessage> actor) : MessageHandler<TClientToServerMessage> {
|
|
||||||
public ActorRef<TClientToServerMessage> Actor => actor;
|
|
||||||
|
|
||||||
public ValueTask OnReply<TMessage, TReply>(uint messageId, TReply reply, CancellationToken cancellationToken) where TMessage : TClientToServerMessage, ICanReply<TReply> {
|
|
||||||
return sendChannel.SendReply(messageId, reply, cancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ValueTask OnError(uint messageId, RpcError error, CancellationToken cancellationToken) {
|
|
||||||
return sendChannel.SendError(messageId, error, cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@@ -1,135 +0,0 @@
|
|||||||
// using System.Collections.Concurrent;
|
|
||||||
// using Akka.Actor;
|
|
||||||
// using NetMQ.Sockets;
|
|
||||||
// using Phantom.Utils.Actor;
|
|
||||||
// using Phantom.Utils.Logging;
|
|
||||||
// using Serilog;
|
|
||||||
// using Serilog.Events;
|
|
||||||
//
|
|
||||||
// namespace Phantom.Utils.Rpc.Runtime2;
|
|
||||||
//
|
|
||||||
// public static class RpcServerRuntime {
|
|
||||||
// public static Task Launch<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(
|
|
||||||
// RpcConfiguration config,
|
|
||||||
// IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions,
|
|
||||||
// IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler,
|
|
||||||
// IActorRefFactory actorSystem,
|
|
||||||
// CancellationToken cancellationToken
|
|
||||||
// ) where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
|
||||||
// return RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Launch(config, messageDefinitions, registrationHandler, actorSystem, cancellationToken);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// internal sealed class RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage> : RpcRuntime<ServerSocket> where TRegistrationMessage : TServerMessage where TReplyMessage : TClientMessage, TServerMessage {
|
|
||||||
// internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) {
|
|
||||||
// var socket = RpcServerSocket.Connect(config);
|
|
||||||
// return new RpcServerRuntime<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>(socket, messageDefinitions, registrationHandler, actorSystem, cancellationToken).Launch();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private readonly string serviceName;
|
|
||||||
// private readonly IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions;
|
|
||||||
// private readonly IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler;
|
|
||||||
// private readonly IActorRefFactory actorSystem;
|
|
||||||
// private readonly CancellationToken cancellationToken;
|
|
||||||
//
|
|
||||||
// private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler, IActorRefFactory actorSystem, CancellationToken cancellationToken) : base(socket) {
|
|
||||||
// this.serviceName = socket.Config.ServiceName;
|
|
||||||
// this.messageDefinitions = messageDefinitions;
|
|
||||||
// this.registrationHandler = registrationHandler;
|
|
||||||
// this.actorSystem = actorSystem;
|
|
||||||
// this.cancellationToken = cancellationToken;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private protected override Task Run(ServerSocket socket) {
|
|
||||||
// var clients = new ConcurrentDictionary<ulong, Client>();
|
|
||||||
//
|
|
||||||
// void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
|
||||||
// if (clients.Remove(e.RoutingId, out var client)) {
|
|
||||||
// client.Connection.Closed -= OnConnectionClosed;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// while (!cancellationToken.IsCancellationRequested) {
|
|
||||||
// var (routingId, data) = socket.Receive(cancellationToken);
|
|
||||||
//
|
|
||||||
// if (data.Length == 0) {
|
|
||||||
// LogUnknownMessage(routingId, data);
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// Type? messageType = messageDefinitions.ToServer.TryGetType(data, out var type) ? type : null;
|
|
||||||
// if (messageType == null) {
|
|
||||||
// LogUnknownMessage(routingId, data);
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (!clients.TryGetValue(routingId, out var client)) {
|
|
||||||
// if (messageType != typeof(TRegistrationMessage)) {
|
|
||||||
// RuntimeLogger.Warning("Received {MessageType} ({Bytes} B) from unregistered client {RoutingId}.", messageType.Name, data.Length, routingId);
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// var clientLoggerName = LoggerName + ":" + routingId;
|
|
||||||
// var clientActorName = "Rpc-" + serviceName + "-" + routingId;
|
|
||||||
//
|
|
||||||
// // TODO add pings and tear down connection after too much inactivity
|
|
||||||
// var connection = new RpcConnectionToClient<TClientMessage>(socket, routingId, messageDefinitions.ToClient, ReplyTracker);
|
|
||||||
// connection.Closed += OnConnectionClosed;
|
|
||||||
//
|
|
||||||
// client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, registrationHandler);
|
|
||||||
// clients[routingId] = client;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// client.Enqueue(messageType, data);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// foreach (var client in clients.Values) {
|
|
||||||
// client.Connection.Close();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return Task.CompletedTask;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void LogUnknownMessage(uint routingId, ReadOnlyMemory<byte> data) {
|
|
||||||
// RuntimeLogger.Warning("Received unknown message ({Bytes} B) from {RoutingId}.", data.Length, routingId);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private protected override Task Disconnect(ServerSocket socket) {
|
|
||||||
// return Task.CompletedTask;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private sealed class Client {
|
|
||||||
// public RpcConnectionToClient<TClientMessage> Connection { get; }
|
|
||||||
//
|
|
||||||
// private readonly ILogger logger;
|
|
||||||
// private readonly ActorRef<RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand> receiverActor;
|
|
||||||
//
|
|
||||||
// public Client(string loggerName, string actorName, RpcConnectionToClient<TClientMessage> connection, IActorRefFactory actorSystem, IMessageDefinitions<TClientMessage, TServerMessage, TReplyMessage> messageDefinitions, IRegistrationHandler<TClientMessage, TServerMessage, TRegistrationMessage> registrationHandler) {
|
|
||||||
// this.Connection = connection;
|
|
||||||
// this.Connection.Closed += OnConnectionClosed;
|
|
||||||
//
|
|
||||||
// this.logger = PhantomLogger.Create(loggerName);
|
|
||||||
//
|
|
||||||
// var receiverActorInit = new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Init(loggerName, messageDefinitions, registrationHandler, Connection);
|
|
||||||
// this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.Factory(receiverActorInit), actorName + "-Receiver");
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
|
|
||||||
// LogMessageType(messageType, data);
|
|
||||||
// receiverActor.Tell(new RpcReceiverActor<TClientMessage, TServerMessage, TRegistrationMessage, TReplyMessage>.ReceiveMessageCommand(messageType, data));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
|
|
||||||
// if (logger.IsEnabled(LogEventLevel.Verbose)) {
|
|
||||||
// logger.Verbose("Received {MessageType} ({Bytes} B).", messageType.Name, data.Length);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private void OnConnectionClosed(object? sender, RpcClientConnectionClosedEventArgs e) {
|
|
||||||
// Connection.Closed -= OnConnectionClosed;
|
|
||||||
//
|
|
||||||
// logger.Debug("Closing connection...");
|
|
||||||
// receiverActor.Stop();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
@@ -52,6 +52,14 @@ static class Serialization {
|
|||||||
return ReadValue(BinaryPrimitives.ReadUInt32LittleEndian, sizeof(uint), stream, cancellationToken);
|
return ReadValue(BinaryPrimitives.ReadUInt32LittleEndian, sizeof(uint), stream, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ValueTask WriteSignedLong(long value, Stream stream, CancellationToken cancellationToken) {
|
||||||
|
return WriteValue(value, sizeof(long), BinaryPrimitives.WriteInt64LittleEndian, stream, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ValueTask<long> ReadSignedLong(Stream stream, CancellationToken cancellationToken) {
|
||||||
|
return ReadValue(BinaryPrimitives.ReadInt64LittleEndian, sizeof(long), stream, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
public static ValueTask WriteGuid(Guid guid, Stream stream, CancellationToken cancellationToken) {
|
public static ValueTask WriteGuid(Guid guid, Stream stream, CancellationToken cancellationToken) {
|
||||||
static void Write(Span<byte> span, Guid guid) {
|
static void Write(Span<byte> span, Guid guid) {
|
||||||
if (!guid.TryWriteBytes(span)) {
|
if (!guid.TryWriteBytes(span)) {
|
||||||
|
@@ -14,7 +14,7 @@ namespace Phantom.Web.Services;
|
|||||||
public static class PhantomWebServices {
|
public static class PhantomWebServices {
|
||||||
public static void AddPhantomServices(this IServiceCollection services) {
|
public static void AddPhantomServices(this IServiceCollection services) {
|
||||||
services.AddSingleton<ControllerConnection>();
|
services.AddSingleton<ControllerConnection>();
|
||||||
services.AddSingleton<ControllerMessageHandlerFactory>();
|
services.AddSingleton<ControllerMessageHandlerActorInitFactory>();
|
||||||
|
|
||||||
services.AddSingleton<AgentManager>();
|
services.AddSingleton<AgentManager>();
|
||||||
services.AddSingleton<InstanceManager>();
|
services.AddSingleton<InstanceManager>();
|
||||||
|
@@ -7,13 +7,12 @@ using Phantom.Web.Services.Instances;
|
|||||||
|
|
||||||
namespace Phantom.Web.Services.Rpc;
|
namespace Phantom.Web.Services.Rpc;
|
||||||
|
|
||||||
sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
|
public sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
|
||||||
public readonly record struct Init(
|
public readonly record struct Init(
|
||||||
AgentManager AgentManager,
|
AgentManager AgentManager,
|
||||||
InstanceManager InstanceManager,
|
InstanceManager InstanceManager,
|
||||||
InstanceLogManager InstanceLogManager,
|
InstanceLogManager InstanceLogManager,
|
||||||
UserSessionRefreshManager UserSessionRefreshManager,
|
UserSessionRefreshManager UserSessionRefreshManager
|
||||||
TaskCompletionSource<bool> RegisterSuccessWaiter
|
|
||||||
);
|
);
|
||||||
|
|
||||||
public static Props<IMessageToWeb> Factory(Init init) {
|
public static Props<IMessageToWeb> Factory(Init init) {
|
||||||
@@ -24,26 +23,19 @@ sealed class ControllerMessageHandlerActor : ReceiveActor<IMessageToWeb> {
|
|||||||
private readonly InstanceManager instanceManager;
|
private readonly InstanceManager instanceManager;
|
||||||
private readonly InstanceLogManager instanceLogManager;
|
private readonly InstanceLogManager instanceLogManager;
|
||||||
private readonly UserSessionRefreshManager userSessionRefreshManager;
|
private readonly UserSessionRefreshManager userSessionRefreshManager;
|
||||||
private readonly TaskCompletionSource<bool> registerSuccessWaiter;
|
|
||||||
|
|
||||||
private ControllerMessageHandlerActor(Init init) {
|
private ControllerMessageHandlerActor(Init init) {
|
||||||
this.agentManager = init.AgentManager;
|
this.agentManager = init.AgentManager;
|
||||||
this.instanceManager = init.InstanceManager;
|
this.instanceManager = init.InstanceManager;
|
||||||
this.instanceLogManager = init.InstanceLogManager;
|
this.instanceLogManager = init.InstanceLogManager;
|
||||||
this.userSessionRefreshManager = init.UserSessionRefreshManager;
|
this.userSessionRefreshManager = init.UserSessionRefreshManager;
|
||||||
this.registerSuccessWaiter = init.RegisterSuccessWaiter;
|
|
||||||
|
|
||||||
Receive<RegisterWebResultMessage>(HandleRegisterWebResult);
|
|
||||||
Receive<RefreshAgentsMessage>(HandleRefreshAgents);
|
Receive<RefreshAgentsMessage>(HandleRefreshAgents);
|
||||||
Receive<RefreshInstancesMessage>(HandleRefreshInstances);
|
Receive<RefreshInstancesMessage>(HandleRefreshInstances);
|
||||||
Receive<InstanceOutputMessage>(HandleInstanceOutput);
|
Receive<InstanceOutputMessage>(HandleInstanceOutput);
|
||||||
Receive<RefreshUserSessionMessage>(HandleRefreshUserSession);
|
Receive<RefreshUserSessionMessage>(HandleRefreshUserSession);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleRegisterWebResult(RegisterWebResultMessage message) {
|
|
||||||
registerSuccessWaiter.TrySetResult(message.Success);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void HandleRefreshAgents(RefreshAgentsMessage message) {
|
private void HandleRefreshAgents(RefreshAgentsMessage message) {
|
||||||
agentManager.RefreshAgents(message.Agents);
|
agentManager.RefreshAgents(message.Agents);
|
||||||
}
|
}
|
||||||
|
@@ -0,0 +1,16 @@
|
|||||||
|
using Phantom.Web.Services.Agents;
|
||||||
|
using Phantom.Web.Services.Authentication;
|
||||||
|
using Phantom.Web.Services.Instances;
|
||||||
|
|
||||||
|
namespace Phantom.Web.Services.Rpc;
|
||||||
|
|
||||||
|
public sealed class ControllerMessageHandlerActorInitFactory(
|
||||||
|
AgentManager agentManager,
|
||||||
|
InstanceManager instanceManager,
|
||||||
|
InstanceLogManager instanceLogManager,
|
||||||
|
UserSessionRefreshManager userSessionRefreshManager
|
||||||
|
) {
|
||||||
|
public ControllerMessageHandlerActor.Init Create() {
|
||||||
|
return new ControllerMessageHandlerActor.Init(agentManager, instanceManager, instanceLogManager, userSessionRefreshManager);
|
||||||
|
}
|
||||||
|
}
|
@@ -1,28 +0,0 @@
|
|||||||
using Akka.Actor;
|
|
||||||
using Phantom.Common.Messages.Web;
|
|
||||||
using Phantom.Utils.Actor;
|
|
||||||
using Phantom.Utils.Tasks;
|
|
||||||
using Phantom.Web.Services.Agents;
|
|
||||||
using Phantom.Web.Services.Authentication;
|
|
||||||
using Phantom.Web.Services.Instances;
|
|
||||||
|
|
||||||
namespace Phantom.Web.Services.Rpc;
|
|
||||||
|
|
||||||
public sealed class ControllerMessageHandlerFactory(
|
|
||||||
AgentManager agentManager,
|
|
||||||
InstanceManager instanceManager,
|
|
||||||
InstanceLogManager instanceLogManager,
|
|
||||||
UserSessionRefreshManager userSessionRefreshManager
|
|
||||||
) {
|
|
||||||
private readonly TaskCompletionSource<bool> registerSuccessWaiter = AsyncTasks.CreateCompletionSource<bool>();
|
|
||||||
|
|
||||||
public Task<bool> RegisterSuccessWaiter => registerSuccessWaiter.Task;
|
|
||||||
|
|
||||||
private int messageHandlerId = 0;
|
|
||||||
|
|
||||||
public ActorRef<IMessageToWeb> Create(IActorRefFactory actorSystem) {
|
|
||||||
var init = new ControllerMessageHandlerActor.Init(agentManager, instanceManager, instanceLogManager, userSessionRefreshManager, registerSuccessWaiter);
|
|
||||||
var name = "ControllerMessageHandler-" + Interlocked.Increment(ref messageHandlerId);
|
|
||||||
return actorSystem.ActorOf(ControllerMessageHandlerActor.Factory(init), name);
|
|
||||||
}
|
|
||||||
}
|
|
@@ -5,6 +5,7 @@ using Phantom.Utils.Actor;
|
|||||||
using Phantom.Utils.Cryptography;
|
using Phantom.Utils.Cryptography;
|
||||||
using Phantom.Utils.IO;
|
using Phantom.Utils.IO;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
using Phantom.Utils.Rpc.Runtime.Client;
|
using Phantom.Utils.Rpc.Runtime.Client;
|
||||||
using Phantom.Utils.Runtime;
|
using Phantom.Utils.Runtime;
|
||||||
using Phantom.Utils.Threading;
|
using Phantom.Utils.Threading;
|
||||||
@@ -71,11 +72,6 @@ try {
|
|||||||
|
|
||||||
using var actorSystem = ActorSystemFactory.Create("Web");
|
using var actorSystem = ActorSystemFactory.Create("Web");
|
||||||
|
|
||||||
ControllerMessageHandlerFactory messageHandlerFactory;
|
|
||||||
await using (var scope = webApplication.Services.CreateAsyncScope()) {
|
|
||||||
messageHandlerFactory = scope.ServiceProvider.GetRequiredService<ControllerMessageHandlerFactory>();
|
|
||||||
}
|
|
||||||
|
|
||||||
Task? rpcClientListener = null;
|
Task? rpcClientListener = null;
|
||||||
try {
|
try {
|
||||||
PhantomLogger.Root.InformationHeading("Launching Phantom Panel web...");
|
PhantomLogger.Root.InformationHeading("Launching Phantom Panel web...");
|
||||||
@@ -83,11 +79,19 @@ try {
|
|||||||
PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", webConfiguration.HttpUrl, webConfiguration.BasePath + "setup");
|
PhantomLogger.Root.Information("For administrator setup, visit: {HttpUrl}{SetupPath}", webConfiguration.HttpUrl, webConfiguration.BasePath + "setup");
|
||||||
|
|
||||||
await WebLauncher.Launch(webConfiguration, webApplication);
|
await WebLauncher.Launch(webConfiguration, webApplication);
|
||||||
rpcClientListener = rpcClient.Listen(messageHandlerFactory.Create(actorSystem));
|
|
||||||
|
ActorRef<IMessageToWeb> rpcMessageHandlerActor;
|
||||||
|
await using (var scope = webApplication.Services.CreateAsyncScope()) {
|
||||||
|
var rpcMessageHandlerInit = scope.ServiceProvider.GetRequiredService<ControllerMessageHandlerActorInitFactory>().Create();
|
||||||
|
rpcMessageHandlerActor = actorSystem.ActorOf(ControllerMessageHandlerActor.Factory(rpcMessageHandlerInit), "ControllerMessageHandler");
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcClientListener = rpcClient.Listen(new IMessageReceiver<IMessageToWeb>.Actor(rpcMessageHandlerActor));
|
||||||
|
|
||||||
PhantomLogger.Root.Information("Phantom Panel web is ready.");
|
PhantomLogger.Root.Information("Phantom Panel web is ready.");
|
||||||
|
|
||||||
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
|
await shutdownCancellationToken.WaitHandle.WaitOneAsync();
|
||||||
|
await webApplication.StopAsync();
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
await rpcClient.SendChannel.SendMessage(new UnregisterWebMessage());
|
await rpcClient.SendChannel.SendMessage(new UnregisterWebMessage());
|
||||||
|
@@ -62,7 +62,7 @@ static class WebLauncher {
|
|||||||
application.MapFallbackToPage("/_Host");
|
application.MapFallbackToPage("/_Host");
|
||||||
|
|
||||||
logger.Information("Starting Web server on port {Port}...", config.Port);
|
logger.Information("Starting Web server on port {Port}...", config.Port);
|
||||||
return application.RunAsync(config.CancellationToken);
|
return application.StartAsync(config.CancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
private sealed class NullLifetime : IHostLifetime {
|
private sealed class NullLifetime : IHostLifetime {
|
||||||
|
Reference in New Issue
Block a user