1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2025-09-30 23:02:48 +02:00
Files

54 lines
1.7 KiB
C#

using System.Threading.Channels;
using Phantom.Common.Messages.Agent;
using Phantom.Utils.Logging;
using Serilog;
namespace Phantom.Agent.Services.Rpc;
sealed class ControllerSendQueue<TMessage> where TMessage : IMessageToController {
private readonly ILogger logger;
private readonly Channel<TMessage> channel;
private readonly Task sendTask;
private readonly CancellationTokenSource shutdownTokenSource = new ();
public ControllerSendQueue(ControllerConnection controllerConnection, string loggerName, int capacity, bool singleWriter) {
this.logger = PhantomLogger.Create<ControllerSendQueue<TMessage>>(loggerName);
this.channel = Channel.CreateBounded<TMessage>(new BoundedChannelOptions(capacity) {
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = singleWriter,
});
this.sendTask = Send(controllerConnection, shutdownTokenSource.Token);
}
private async Task Send(ControllerConnection controllerConnection, CancellationToken cancellationToken) {
await foreach (var message in channel.Reader.ReadAllAsync(cancellationToken)) {
await controllerConnection.Send(message, cancellationToken);
}
}
public void Enqueue(TMessage message) {
channel.Writer.TryWrite(message);
}
public async Task Shutdown(TimeSpan gracefulTimeout) {
channel.Writer.TryComplete();
try {
await sendTask.WaitAsync(gracefulTimeout);
} catch (TimeoutException) {
logger.Warning("Timed out waiting for queue to finish processing.");
} catch (Exception) {
// Ignore.
}
await shutdownTokenSource.CancelAsync();
await sendTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
shutdownTokenSource.Dispose();
}
}