mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2025-09-16 09:24:49 +02:00
Compare commits
1 Commits
wip-rpc
...
dcfe66a337
Author | SHA1 | Date | |
---|---|---|---|
dcfe66a337
|
@@ -17,6 +17,8 @@ using Phantom.Utils.Tasks;
|
|||||||
namespace Phantom.Controller.Services;
|
namespace Phantom.Controller.Services;
|
||||||
|
|
||||||
public sealed class ControllerServices : IAsyncDisposable {
|
public sealed class ControllerServices : IAsyncDisposable {
|
||||||
|
public ActorSystem ActorSystem { get; }
|
||||||
|
|
||||||
private TaskManager TaskManager { get; }
|
private TaskManager TaskManager { get; }
|
||||||
private ControllerState ControllerState { get; }
|
private ControllerState ControllerState { get; }
|
||||||
private MinecraftVersions MinecraftVersions { get; }
|
private MinecraftVersions MinecraftVersions { get; }
|
||||||
@@ -32,8 +34,7 @@ public sealed class ControllerServices : IAsyncDisposable {
|
|||||||
private UserRoleManager UserRoleManager { get; }
|
private UserRoleManager UserRoleManager { get; }
|
||||||
private UserLoginManager UserLoginManager { get; }
|
private UserLoginManager UserLoginManager { get; }
|
||||||
private AuditLogManager AuditLogManager { get; }
|
private AuditLogManager AuditLogManager { get; }
|
||||||
|
|
||||||
private readonly ActorSystem actorSystem;
|
|
||||||
private readonly IDbContextProvider dbProvider;
|
private readonly IDbContextProvider dbProvider;
|
||||||
private readonly AuthToken webAuthToken;
|
private readonly AuthToken webAuthToken;
|
||||||
private readonly CancellationToken cancellationToken;
|
private readonly CancellationToken cancellationToken;
|
||||||
@@ -43,13 +44,13 @@ public sealed class ControllerServices : IAsyncDisposable {
|
|||||||
this.webAuthToken = webAuthToken;
|
this.webAuthToken = webAuthToken;
|
||||||
this.cancellationToken = shutdownCancellationToken;
|
this.cancellationToken = shutdownCancellationToken;
|
||||||
|
|
||||||
this.actorSystem = ActorSystemFactory.Create("Controller");
|
this.ActorSystem = ActorSystemFactory.Create("Controller");
|
||||||
|
|
||||||
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
|
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, ControllerServices>());
|
||||||
this.ControllerState = new ControllerState();
|
this.ControllerState = new ControllerState();
|
||||||
this.MinecraftVersions = new MinecraftVersions();
|
this.MinecraftVersions = new MinecraftVersions();
|
||||||
|
|
||||||
this.AgentManager = new AgentManager(actorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
|
this.AgentManager = new AgentManager(ActorSystem, agentAuthToken, ControllerState, MinecraftVersions, dbProvider, cancellationToken);
|
||||||
this.InstanceLogManager = new InstanceLogManager();
|
this.InstanceLogManager = new InstanceLogManager();
|
||||||
|
|
||||||
this.UserManager = new UserManager(dbProvider);
|
this.UserManager = new UserManager(dbProvider);
|
||||||
@@ -67,7 +68,7 @@ public sealed class ControllerServices : IAsyncDisposable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
|
public WebMessageListener CreateWebMessageListener(RpcConnectionToClient<IMessageToWebListener> connection) {
|
||||||
return new WebMessageListener(actorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
|
return new WebMessageListener(ActorSystem, connection, webAuthToken, ControllerState, UserManager, RoleManager, UserRoleManager, UserLoginManager, AuditLogManager, AgentManager, InstanceLogManager, MinecraftVersions, EventLogManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Initialize() {
|
public async Task Initialize() {
|
||||||
@@ -78,7 +79,7 @@ public sealed class ControllerServices : IAsyncDisposable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async ValueTask DisposeAsync() {
|
public async ValueTask DisposeAsync() {
|
||||||
await actorSystem.Terminate();
|
await ActorSystem.Terminate();
|
||||||
actorSystem.Dispose();
|
ActorSystem.Dispose();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -54,24 +54,23 @@ try {
|
|||||||
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
|
PhantomLogger.Root.InformationHeading("Launching Phantom Panel server...");
|
||||||
|
|
||||||
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
|
var dbContextFactory = new ApplicationDbContextFactory(sqlConnectionString);
|
||||||
|
|
||||||
await using (var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken)) {
|
|
||||||
await controllerServices.Initialize();
|
|
||||||
|
|
||||||
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
|
await using var controllerServices = new ControllerServices(dbContextFactory, agentKeyData.AuthToken, webKeyData.AuthToken, shutdownCancellationToken);
|
||||||
return new RpcConfiguration("Rpc:" + serviceName, host, port, connectionKey.Certificate);
|
await controllerServices.Initialize();
|
||||||
}
|
|
||||||
|
|
||||||
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
|
static RpcConfiguration ConfigureRpc(string serviceName, string host, ushort port, ConnectionKeyData connectionKey) {
|
||||||
try {
|
return new RpcConfiguration(serviceName, host, port, connectionKey.Certificate);
|
||||||
await Task.WhenAll(
|
}
|
||||||
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, shutdownCancellationToken),
|
|
||||||
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, shutdownCancellationToken)
|
var rpcTaskManager = new TaskManager(PhantomLogger.Create<TaskManager>("Rpc"));
|
||||||
);
|
try {
|
||||||
} finally {
|
await Task.WhenAll(
|
||||||
await rpcTaskManager.Stop();
|
RpcServerRuntime.Launch(ConfigureRpc("Agent", agentRpcServerHost, agentRpcServerPort, agentKeyData), AgentMessageRegistries.Definitions, controllerServices.CreateAgentMessageListener, controllerServices.ActorSystem, shutdownCancellationToken),
|
||||||
NetMQConfig.Cleanup();
|
RpcServerRuntime.Launch(ConfigureRpc("Web", webRpcServerHost, webRpcServerPort, webKeyData), WebMessageRegistries.Definitions, controllerServices.CreateWebMessageListener, controllerServices.ActorSystem, shutdownCancellationToken)
|
||||||
}
|
);
|
||||||
|
} finally {
|
||||||
|
await rpcTaskManager.Stop();
|
||||||
|
NetMQConfig.Cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@@ -5,6 +5,7 @@ namespace Phantom.Utils.Actor;
|
|||||||
public readonly struct ActorConfiguration {
|
public readonly struct ActorConfiguration {
|
||||||
public SupervisorStrategy? SupervisorStrategy { get; init; }
|
public SupervisorStrategy? SupervisorStrategy { get; init; }
|
||||||
public string? MailboxType { get; init; }
|
public string? MailboxType { get; init; }
|
||||||
|
public int? StashCapacity { get; init; }
|
||||||
|
|
||||||
internal Props Apply(Props props) {
|
internal Props Apply(Props props) {
|
||||||
if (SupervisorStrategy != null) {
|
if (SupervisorStrategy != null) {
|
||||||
@@ -14,6 +15,10 @@ public readonly struct ActorConfiguration {
|
|||||||
if (MailboxType != null) {
|
if (MailboxType != null) {
|
||||||
props = props.WithMailbox(MailboxType);
|
props = props.WithMailbox(MailboxType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (StashCapacity != null) {
|
||||||
|
props = props.WithStashCapacity(StashCapacity.Value);
|
||||||
|
}
|
||||||
|
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
@@ -28,4 +28,8 @@ public readonly struct ActorRef<TMessage> {
|
|||||||
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
|
public Task<TReply> Request<TReply>(ICanReply<TReply> message, CancellationToken cancellationToken = default) {
|
||||||
return Request(message, timeout: null, cancellationToken);
|
return Request(message, timeout: null, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Task<bool> Stop(TimeSpan? timeout = null) {
|
||||||
|
return actorRef.GracefulStop(timeout ?? Timeout.InfiniteTimeSpan);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -12,6 +12,7 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\Phantom.Utils.Actor\Phantom.Utils.Actor.csproj" />
|
||||||
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
|
<ProjectReference Include="..\Phantom.Utils\Phantom.Utils.csproj" />
|
||||||
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
|
<ProjectReference Include="..\Phantom.Utils.Logging\Phantom.Utils.Logging.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
namespace Phantom.Utils.Rpc;
|
namespace Phantom.Utils.Rpc;
|
||||||
|
|
||||||
public sealed record RpcConfiguration(string LoggerName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
public sealed record RpcConfiguration(string ServiceName, string Host, ushort Port, NetMQCertificate ServerCertificate) {
|
||||||
|
public string LoggerName => "Rpc:" + ServiceName;
|
||||||
public string TcpUrl => "tcp://" + Host + ":" + Port;
|
public string TcpUrl => "tcp://" + Host + ":" + Port;
|
||||||
}
|
}
|
||||||
|
64
Utils/Phantom.Utils.Rpc/RpcReceiverActor.cs
Normal file
64
Utils/Phantom.Utils.Rpc/RpcReceiverActor.cs
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.Event;
|
||||||
|
using Phantom.Utils.Actor;
|
||||||
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
using Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
|
namespace Phantom.Utils.Rpc;
|
||||||
|
|
||||||
|
sealed class RpcReceiverActor<TClientListener, TServerListener, TReplyMessage> : ReceiveActor<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand>, IWithStash where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
|
public readonly record struct Init(IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> MessageDefinitions, MessageHandler<TServerListener> MessageHandler, RpcConnectionToClient<TClientListener> Connection);
|
||||||
|
|
||||||
|
public static Props<ICommand> Factory(Init init) {
|
||||||
|
return Props<ICommand>.Create(() => new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>(init), new ActorConfiguration {
|
||||||
|
SupervisorStrategy = SupervisorStrategies.Resume,
|
||||||
|
StashCapacity = 100
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public IStash Stash { get; set; } = null!;
|
||||||
|
|
||||||
|
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||||
|
private readonly MessageHandler<TServerListener> messageHandler;
|
||||||
|
private readonly RpcConnectionToClient<TClientListener> connection;
|
||||||
|
|
||||||
|
private RpcReceiverActor(Init init) {
|
||||||
|
this.messageDefinitions = init.MessageDefinitions;
|
||||||
|
this.messageHandler = init.MessageHandler;
|
||||||
|
this.connection = init.Connection;
|
||||||
|
|
||||||
|
ReceiveAsync<ReceiveMessageCommand>(ReceiveMessageUnauthorized);
|
||||||
|
}
|
||||||
|
|
||||||
|
public interface ICommand {}
|
||||||
|
|
||||||
|
public sealed record ReceiveMessageCommand(Type MessageType, ReadOnlyMemory<byte> Data) : ICommand;
|
||||||
|
|
||||||
|
private async Task ReceiveMessageUnauthorized(ReceiveMessageCommand command) {
|
||||||
|
if (messageDefinitions.IsRegistrationMessage(command.MessageType)) {
|
||||||
|
Handle(command.Data);
|
||||||
|
|
||||||
|
if (await connection.GetAuthorization()) {
|
||||||
|
Stash.UnstashAll();
|
||||||
|
|
||||||
|
Become(() => {
|
||||||
|
Receive<ReceiveMessageCommand>(ReceiveMessageAuthorized);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (Stash.IsFull) {
|
||||||
|
Context.GetLogger().Warning("Stash is full, dropping message: {MessageType}", command.MessageType);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Stash.Stash();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ReceiveMessageAuthorized(ReceiveMessageCommand command) {
|
||||||
|
Handle(command.Data);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void Handle(ReadOnlyMemory<byte> data) {
|
||||||
|
messageDefinitions.ToServer.Handle(data, messageHandler);
|
||||||
|
}
|
||||||
|
}
|
@@ -1,5 +1,7 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
|
using Akka.Actor;
|
||||||
using NetMQ.Sockets;
|
using NetMQ.Sockets;
|
||||||
|
using Phantom.Utils.Actor;
|
||||||
using Phantom.Utils.Logging;
|
using Phantom.Utils.Logging;
|
||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
using Phantom.Utils.Rpc.Sockets;
|
using Phantom.Utils.Rpc.Sockets;
|
||||||
@@ -9,25 +11,29 @@ using Serilog.Events;
|
|||||||
namespace Phantom.Utils.Rpc.Runtime;
|
namespace Phantom.Utils.Rpc.Runtime;
|
||||||
|
|
||||||
public static class RpcServerRuntime {
|
public static class RpcServerRuntime {
|
||||||
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
public static Task Launch<TClientListener, TServerListener, TReplyMessage>(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, cancellationToken);
|
return RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>.Launch(config, messageDefinitions, listenerFactory, actorSystem, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyMessage> : RpcRuntime<ServerSocket> where TReplyMessage : IMessage<TClientListener, NoReply>, IMessage<TServerListener, NoReply> {
|
||||||
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) {
|
internal static Task Launch(RpcConfiguration config, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) {
|
||||||
var socket = RpcServerSocket.Connect(config);
|
var socket = RpcServerSocket.Connect(config);
|
||||||
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, cancellationToken).Launch();
|
return new RpcServerRuntime<TClientListener, TServerListener, TReplyMessage>(socket, messageDefinitions, listenerFactory, actorSystem, cancellationToken).Launch();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private readonly string serviceName;
|
||||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||||
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
|
private readonly Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory;
|
||||||
|
private readonly ActorSystem actorSystem;
|
||||||
private readonly TaskManager taskManager;
|
private readonly TaskManager taskManager;
|
||||||
private readonly CancellationToken cancellationToken;
|
private readonly CancellationToken cancellationToken;
|
||||||
|
|
||||||
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, CancellationToken cancellationToken) : base(socket) {
|
private RpcServerRuntime(RpcServerSocket socket, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, Func<RpcConnectionToClient<TClientListener>, TServerListener> listenerFactory, ActorSystem actorSystem, CancellationToken cancellationToken) : base(socket) {
|
||||||
|
this.serviceName = socket.Config.ServiceName;
|
||||||
this.messageDefinitions = messageDefinitions;
|
this.messageDefinitions = messageDefinitions;
|
||||||
this.listenerFactory = listenerFactory;
|
this.listenerFactory = listenerFactory;
|
||||||
|
this.actorSystem = actorSystem;
|
||||||
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
|
this.taskManager = new TaskManager(PhantomLogger.Create<TaskManager>(socket.Config.LoggerName + ":Runtime"));
|
||||||
this.cancellationToken = cancellationToken;
|
this.cancellationToken = cancellationToken;
|
||||||
}
|
}
|
||||||
@@ -62,18 +68,17 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
|||||||
}
|
}
|
||||||
|
|
||||||
var clientLoggerName = LoggerName + ":" + routingId;
|
var clientLoggerName = LoggerName + ":" + routingId;
|
||||||
var processingQueue = new RpcQueue(taskManager, "Process messages from " + routingId);
|
|
||||||
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
|
var connection = new RpcConnectionToClient<TClientListener>(clientLoggerName, socket, routingId, messageDefinitions.ToClient, ReplyTracker);
|
||||||
|
|
||||||
connection.Closed += OnConnectionClosed;
|
connection.Closed += OnConnectionClosed;
|
||||||
|
|
||||||
client = new Client(clientLoggerName, connection, processingQueue, messageDefinitions, listenerFactory(connection), taskManager);
|
var clientActorName = "RpcReceive-" + serviceName + "-" + routingId;
|
||||||
|
|
||||||
|
client = new Client(clientLoggerName, clientActorName, connection, actorSystem, messageDefinitions, listenerFactory(connection), taskManager);
|
||||||
clients[routingId] = client;
|
clients[routingId] = client;
|
||||||
client.EnqueueRegistrationMessage(messageType, data);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
client.Enqueue(messageType, data);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
client.Enqueue(messageType, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach (var client in clients.Values) {
|
foreach (var client in clients.Values) {
|
||||||
@@ -94,27 +99,22 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
|||||||
private sealed class Client : MessageHandler<TServerListener> {
|
private sealed class Client : MessageHandler<TServerListener> {
|
||||||
public RpcConnectionToClient<TClientListener> Connection { get; }
|
public RpcConnectionToClient<TClientListener> Connection { get; }
|
||||||
|
|
||||||
private readonly RpcQueue processingQueue;
|
private readonly ActorRef<RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ICommand> receiverActor;
|
||||||
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
private readonly IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions;
|
||||||
private readonly TaskManager taskManager;
|
private readonly TaskManager taskManager;
|
||||||
|
|
||||||
public Client(string loggerName, RpcConnectionToClient<TClientListener> connection, RpcQueue processingQueue, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
|
public Client(string loggerName, string actorName, RpcConnectionToClient<TClientListener> connection, ActorSystem actorSystem, IMessageDefinitions<TClientListener, TServerListener, TReplyMessage> messageDefinitions, TServerListener listener, TaskManager taskManager) : base(loggerName, listener) {
|
||||||
this.Connection = connection;
|
this.Connection = connection;
|
||||||
this.Connection.Closed += OnConnectionClosed;
|
this.Connection.Closed += OnConnectionClosed;
|
||||||
|
|
||||||
this.processingQueue = processingQueue;
|
this.receiverActor = actorSystem.ActorOf(RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Factory(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.Init(messageDefinitions, this, Connection)), actorName);
|
||||||
this.messageDefinitions = messageDefinitions;
|
this.messageDefinitions = messageDefinitions;
|
||||||
this.taskManager = taskManager;
|
this.taskManager = taskManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
internal void EnqueueRegistrationMessage(Type messageType, ReadOnlyMemory<byte> data) {
|
|
||||||
LogMessageType(messageType, data);
|
|
||||||
processingQueue.Enqueue(() => Handle(data));
|
|
||||||
}
|
|
||||||
|
|
||||||
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
|
internal void Enqueue(Type messageType, ReadOnlyMemory<byte> data) {
|
||||||
LogMessageType(messageType, data);
|
LogMessageType(messageType, data);
|
||||||
processingQueue.Enqueue(() => WaitForAuthorizationAndHandle(data));
|
receiverActor.Tell(new RpcReceiverActor<TClientListener, TServerListener, TReplyMessage>.ReceiveMessageCommand(messageType, data));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
|
private void LogMessageType(Type messageType, ReadOnlyMemory<byte> data) {
|
||||||
@@ -123,19 +123,6 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void Handle(ReadOnlyMemory<byte> data) {
|
|
||||||
messageDefinitions.ToServer.Handle(data, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task WaitForAuthorizationAndHandle(ReadOnlyMemory<byte> data) {
|
|
||||||
if (await Connection.GetAuthorization()) {
|
|
||||||
Handle(data);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
Logger.Warning("Dropped message after failed registration.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
protected override Task SendReply(uint sequenceId, byte[] serializedReply) {
|
||||||
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
|
return Connection.Send(messageDefinitions.CreateReplyMessage(sequenceId, serializedReply));
|
||||||
}
|
}
|
||||||
@@ -147,7 +134,7 @@ internal sealed class RpcServerRuntime<TClientListener, TServerListener, TReplyM
|
|||||||
|
|
||||||
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
|
taskManager.Run("Closing connection to " + e.RoutingId, async () => {
|
||||||
await StopReceiving();
|
await StopReceiving();
|
||||||
await processingQueue.Stop();
|
await receiverActor.Stop();
|
||||||
await Connection.StopSending();
|
await Connection.StopSending();
|
||||||
Logger.Debug("Connection closed.");
|
Logger.Debug("Connection closed.");
|
||||||
});
|
});
|
||||||
|
Reference in New Issue
Block a user