1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-10-18 06:42:50 +02:00

Compare commits

...

7 Commits

28 changed files with 134 additions and 87 deletions

View File

@ -7,7 +7,7 @@ public sealed class InstanceProcess : IDisposable {
public InstanceProperties InstanceProperties { get; }
public CancellableSemaphore BackupSemaphore { get; } = new (1, 1);
private readonly RingBuffer<string> outputBuffer = new (10000);
private readonly RingBuffer<string> outputBuffer = new (100);
private event EventHandler<string>? OutputEvent;
public event EventHandler? Ended;

View File

@ -79,10 +79,9 @@ public sealed class JavaRuntimeDiscovery {
WorkingDirectory = Path.GetDirectoryName(javaExecutablePath),
Arguments = "-XshowSettings:properties -version",
RedirectStandardInput = false,
RedirectStandardOutput = true,
RedirectStandardOutput = false,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = false
UseShellExecute = false
};
var process = new Process { StartInfo = startInfo };

View File

@ -10,7 +10,7 @@ public sealed partial class MinecraftServerExecutables {
private static readonly ILogger Logger = PhantomLogger.Create<MinecraftServerExecutables>();
[GeneratedRegex(@"[^a-zA-Z0-9_\-\.]", RegexOptions.Compiled)]
private static partial Regex VersionFolderSanitizeRegex();
private static partial Regex SanitizePathRegex();
private readonly string basePath;
private readonly Dictionary<string, MinecraftServerExecutableDownloader> runningDownloadersByVersion = new ();
@ -20,7 +20,7 @@ public sealed partial class MinecraftServerExecutables {
}
internal async Task<string?> DownloadAndGetPath(FileDownloadInfo? fileDownloadInfo, string minecraftVersion, EventHandler<DownloadProgressEventArgs> progressEventHandler, CancellationToken cancellationToken) {
string serverExecutableFolderPath = Path.Combine(basePath, VersionFolderSanitizeRegex().Replace(minecraftVersion, "_"));
string serverExecutableFolderPath = Path.Combine(basePath, SanitizePathRegex().IsMatch(minecraftVersion) ? SanitizePathRegex().Replace(minecraftVersion, "_") : minecraftVersion);
string serverExecutableFilePath = Path.Combine(serverExecutableFolderPath, "server.jar");
if (File.Exists(serverExecutableFilePath)) {

View File

@ -78,7 +78,7 @@ public sealed class ServerStatusProtocol {
return null;
}
string onlinePlayerCountStr = Encoding.BigEndianUnicode.GetString(messageBuffer[(separator1 + 1)..(separator2 - 1)]);
string onlinePlayerCountStr = Encoding.BigEndianUnicode.GetString(messageBuffer.AsSpan((separator1 + 1)..(separator2 - 1)));
if (!int.TryParse(onlinePlayerCountStr, out int onlinePlayerCount)) {
logger.Error("Could not parse online player count in response from server: {OnlinePlayerCount}.", onlinePlayerCountStr);
return null;

View File

@ -13,7 +13,7 @@ using Serilog.Events;
namespace Phantom.Agent.Rpc;
public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
public static async Task Launch(RpcConfiguration config, AgentAuthToken authToken, AgentInfo agentInfo, Func<RpcServerConnection, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
public static Task Launch(RpcConfiguration config, AgentAuthToken authToken, AgentInfo agentInfo, Func<RpcServerConnection, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
var socket = new ClientSocket();
var options = socket.Options;
@ -21,7 +21,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
options.CurveCertificate = new NetMQCertificate();
options.HelloMessage = MessageRegistries.ToServer.Write(new RegisterAgentMessage(authToken, agentInfo)).ToArray();
await new RpcLauncher(config, socket, agentInfo.Guid, listenerFactory, disconnectSemaphore, receiveCancellationToken).Launch();
return new RpcLauncher(config, socket, agentInfo.Guid, listenerFactory, disconnectSemaphore, receiveCancellationToken).Launch();
}
private readonly RpcConfiguration config;

View File

@ -0,0 +1,3 @@
namespace Phantom.Agent.Services;
public readonly record struct AgentServiceConfiguration(int MaxConcurrentCompressionTasks);

View File

@ -18,10 +18,10 @@ public sealed class AgentServices {
internal JavaRuntimeRepository JavaRuntimeRepository { get; }
internal InstanceSessionManager InstanceSessionManager { get; }
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders) {
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration) {
this.AgentFolders = agentFolders;
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>());
this.BackupManager = new BackupManager(agentFolders);
this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks);
this.JavaRuntimeRepository = new JavaRuntimeRepository();
this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager, BackupManager);
}
@ -40,6 +40,8 @@ public sealed class AgentServices {
await TaskManager.Stop();
BackupManager.Dispose();
Logger.Information("Services stopped.");
}
}

View File

@ -5,15 +5,21 @@ using Serilog;
namespace Phantom.Agent.Services.Backups;
sealed class BackupManager {
sealed class BackupManager : IDisposable {
private readonly string destinationBasePath;
private readonly string temporaryBasePath;
private readonly SemaphoreSlim compressionSemaphore;
public BackupManager(AgentFolders agentFolders) {
public BackupManager(AgentFolders agentFolders, int maxConcurrentCompressionTasks) {
this.destinationBasePath = agentFolders.BackupsFolderPath;
this.temporaryBasePath = Path.Combine(agentFolders.TemporaryFolderPath, "backups");
this.compressionSemaphore = new SemaphoreSlim(maxConcurrentCompressionTasks, maxConcurrentCompressionTasks);
}
public void Dispose() {
compressionSemaphore.Dispose();
}
public async Task<BackupCreationResult> CreateBackup(string loggerName, InstanceProcess process, CancellationToken cancellationToken) {
try {
if (!await process.BackupSemaphore.Wait(TimeSpan.FromSeconds(1), cancellationToken)) {
@ -26,23 +32,21 @@ sealed class BackupManager {
}
try {
return await new BackupCreator(destinationBasePath, temporaryBasePath, loggerName, process, cancellationToken).CreateBackup();
return await new BackupCreator(this, loggerName, process, cancellationToken).CreateBackup();
} finally {
process.BackupSemaphore.Release();
}
}
private sealed class BackupCreator {
private readonly string destinationBasePath;
private readonly string temporaryBasePath;
private readonly BackupManager manager;
private readonly string loggerName;
private readonly ILogger logger;
private readonly InstanceProcess process;
private readonly CancellationToken cancellationToken;
public BackupCreator(string destinationBasePath, string temporaryBasePath, string loggerName, InstanceProcess process, CancellationToken cancellationToken) {
this.destinationBasePath = destinationBasePath;
this.temporaryBasePath = temporaryBasePath;
public BackupCreator(BackupManager manager, string loggerName, InstanceProcess process, CancellationToken cancellationToken) {
this.manager = manager;
this.loggerName = loggerName;
this.logger = PhantomLogger.Create<BackupManager>(loggerName);
this.process = process;
@ -72,7 +76,7 @@ sealed class BackupManager {
try {
await dispatcher.DisableAutomaticSaving();
await dispatcher.SaveAllChunks();
return await new BackupArchiver(destinationBasePath, temporaryBasePath, loggerName, process.InstanceProperties, cancellationToken).ArchiveWorld(resultBuilder);
return await new BackupArchiver(manager.destinationBasePath, manager.temporaryBasePath, loggerName, process.InstanceProperties, cancellationToken).ArchiveWorld(resultBuilder);
} catch (OperationCanceledException) {
resultBuilder.Kind = BackupCreationResultKind.BackupCancelled;
logger.Warning("Backup creation was cancelled.");
@ -92,11 +96,21 @@ sealed class BackupManager {
}
}
}
private async Task CompressWorldArchive(string filePath, BackupCreationResult.Builder resultBuilder) {
var compressedFilePath = await BackupCompressor.Compress(filePath, cancellationToken);
if (compressedFilePath == null) {
resultBuilder.Warnings |= BackupCreationWarnings.CouldNotCompressWorldArchive;
if (!await manager.compressionSemaphore.WaitAsync(TimeSpan.FromSeconds(1), cancellationToken)) {
logger.Information("Too many compression tasks running, waiting for one of them to complete...");
await manager.compressionSemaphore.WaitAsync(cancellationToken);
}
logger.Information("Compressing backup...");
try {
var compressedFilePath = await BackupCompressor.Compress(filePath, cancellationToken);
if (compressedFilePath == null) {
resultBuilder.Warnings |= BackupCreationWarnings.CouldNotCompressWorldArchive;
}
} finally {
manager.compressionSemaphore.Release();
}
}

View File

@ -27,6 +27,7 @@ sealed class BackupScheduler : CancellableBackgroundTask {
this.process = process;
this.serverPort = serverPort;
this.serverStatusProtocol = new ServerStatusProtocol(loggerName);
Start();
}
protected override async Task RunTask() {
@ -93,4 +94,8 @@ sealed class BackupScheduler : CancellableBackgroundTask {
Logger.Debug("Detected server output, signalling to check for online players again.");
}
}
protected override void Dispose() {
serverOutputWhileWaitingForOnlinePlayers.Dispose();
}
}

View File

@ -1,36 +1,62 @@
using System.Collections.Immutable;
using System.Threading.Channels;
using Phantom.Agent.Rpc;
using Phantom.Common.Logging;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Collections;
using Phantom.Utils.Runtime;
namespace Phantom.Agent.Services.Instances;
sealed class InstanceLogSender : CancellableBackgroundTask {
private static readonly BoundedChannelOptions BufferOptions = new (capacity: 64) {
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.DropNewest
};
private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200);
private readonly Guid instanceGuid;
private readonly SemaphoreSlim semaphore = new (1, 1);
private readonly RingBuffer<string> buffer = new (1000);
private readonly Channel<string> outputChannel;
private int droppedLinesSinceLastSend;
public InstanceLogSender(TaskManager taskManager, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName), taskManager, "Instance log sender for " + loggerName) {
this.instanceGuid = instanceGuid;
this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped);
Start();
}
protected override async Task RunTask() {
var lineReader = outputChannel.Reader;
var lineBuilder = ImmutableArray.CreateBuilder<string>();
try {
while (!CancellationToken.IsCancellationRequested) {
await SendOutputToServer(await DequeueOrThrow());
while (await lineReader.WaitToReadAsync(CancellationToken)) {
await Task.Delay(SendDelay, CancellationToken);
await SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
}
} catch (OperationCanceledException) {
// Ignore.
}
// Flush remaining lines.
await SendOutputToServer(DequeueWithoutSemaphore());
await SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
}
private ImmutableArray<string> ReadLinesFromChannel(ChannelReader<string> reader, ImmutableArray<string>.Builder builder) {
builder.Clear();
while (reader.TryRead(out string? line)) {
builder.Add(line);
}
int droppedLines = Interlocked.Exchange(ref droppedLinesSinceLastSend, 0);
if (droppedLines > 0) {
builder.Add($"Dropped {droppedLines} {(droppedLines == 1 ? "line" : "lines")} due to buffer overflow.");
}
return builder.ToImmutable();
}
private async Task SendOutputToServer(ImmutableArray<string> lines) {
@ -39,33 +65,18 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
}
}
private ImmutableArray<string> DequeueWithoutSemaphore() {
ImmutableArray<string> lines = buffer.Count > 0 ? buffer.EnumerateLast(uint.MaxValue).ToImmutableArray() : ImmutableArray<string>.Empty;
buffer.Clear();
return lines;
}
private async Task<ImmutableArray<string>> DequeueOrThrow() {
await semaphore.WaitAsync(CancellationToken);
try {
return DequeueWithoutSemaphore();
} finally {
semaphore.Release();
}
private void OnLineDropped(string line) {
Logger.Warning("Buffer is full, dropped line: {Line}", line);
Interlocked.Increment(ref droppedLinesSinceLastSend);
}
public void Enqueue(string line) {
try {
semaphore.Wait(CancellationToken);
} catch (Exception) {
return;
}
outputChannel.Writer.TryWrite(line);
}
try {
buffer.Add(line);
} finally {
semaphore.Release();
protected override void Dispose() {
if (!outputChannel.Writer.TryComplete()) {
Logger.Error("Could not mark channel as completed.");
}
}
}

View File

@ -17,13 +17,15 @@ PosixSignals.RegisterCancellation(shutdownCancellationTokenSource, static () =>
PhantomLogger.Root.InformationHeading("Stopping Phantom Panel agent...");
});
ThreadPool.SetMinThreads(workerThreads: 2, completionPortThreads: 1);
try {
var fullVersion = AssemblyAttributes.GetFullVersion(Assembly.GetExecutingAssembly());
PhantomLogger.Root.InformationHeading("Initializing Phantom Panel agent...");
PhantomLogger.Root.Information("Agent version: {Version}", fullVersion);
var (serverHost, serverPort, javaSearchPath, agentKeyToken, agentKeyFilePath, agentName, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts) = Variables.LoadOrExit();
var (serverHost, serverPort, javaSearchPath, agentKeyToken, agentKeyFilePath, agentName, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts, maxConcurrentBackupCompressionTasks) = Variables.LoadOrExit();
var agentKey = await AgentKey.Load(agentKeyToken, agentKeyFilePath);
if (agentKey == null) {
@ -42,7 +44,7 @@ try {
var (serverCertificate, agentToken) = agentKey.Value;
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
var agentServices = new AgentServices(agentInfo, folders);
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks));
MessageListener MessageListenerFactory(RpcServerConnection connection) {
return new MessageListener(connection, agentServices, shutdownCancellationTokenSource);

View File

@ -15,7 +15,8 @@ sealed record Variables(
ushort MaxInstances,
RamAllocationUnits MaxMemory,
AllowedPorts AllowedServerPorts,
AllowedPorts AllowedRconPorts
AllowedPorts AllowedRconPorts,
ushort MaxConcurrentBackupCompressionTasks
) {
private static Variables LoadOrThrow() {
var (agentKeyToken, agentKeyFilePath) = EnvironmentVariables.GetEitherString("AGENT_KEY", "AGENT_KEY_FILE").Require;
@ -31,7 +32,8 @@ sealed record Variables(
(ushort) EnvironmentVariables.GetInteger("MAX_INSTANCES", min: 1, max: 10000).Require,
EnvironmentVariables.GetString("MAX_MEMORY").MapParse(RamAllocationUnits.FromString).Require,
EnvironmentVariables.GetString("ALLOWED_SERVER_PORTS").MapParse(AllowedPorts.FromString).Require,
EnvironmentVariables.GetString("ALLOWED_RCON_PORTS").MapParse(AllowedPorts.FromString).Require
EnvironmentVariables.GetString("ALLOWED_RCON_PORTS").MapParse(AllowedPorts.FromString).Require,
(ushort) EnvironmentVariables.GetInteger("MAX_CONCURRENT_BACKUP_COMPRESSION_TASKS", min: 1, max: 10000).WithDefault(1)
);
}

View File

@ -53,6 +53,6 @@ public sealed partial class AllowedPorts {
}
public static AllowedPorts FromString(string definitions) {
return FromString((ReadOnlySpan<char>) definitions);
return FromString(definitions.AsSpan());
}
}

View File

@ -98,6 +98,6 @@ public readonly partial record struct RamAllocationUnits(
/// </summary>
/// <exception cref="ArgumentOutOfRangeException">If the <paramref name="definition"/> is in the incorrect format, or the value cannot be converted via <see cref="FromMegabytes"/>.</exception>
public static RamAllocationUnits FromString(string definition) {
return FromString((ReadOnlySpan<char>) definition);
return FromString(definition.AsSpan());
}
}

View File

@ -5,7 +5,7 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer;
[MemoryPackable]
public partial record ReportAgentStatusMessage(
public sealed partial record ReportAgentStatusMessage(
[property: MemoryPackOrder(0)] int RunningInstanceCount,
[property: MemoryPackOrder(1)] RamAllocationUnits RunningInstanceMemory
) : IMessageToServer {

View File

@ -1,11 +1,11 @@
<Project>
<ItemGroup>
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="7.0.1" />
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="7.0.1" />
<PackageReference Update="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="7.0.1" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="7.0.1" />
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.1" />
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="7.0.3" />
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="7.0.3" />
<PackageReference Update="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="7.0.3" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="7.0.3" />
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.3" />
</ItemGroup>
<ItemGroup>
@ -13,7 +13,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Update="MemoryPack" Version="1.9.7" />
<PackageReference Update="MemoryPack" Version="1.9.13" />
<PackageReference Update="NetMQ" Version="4.0.1.10" />
</ItemGroup>
@ -26,10 +26,10 @@
<ItemGroup>
<PackageReference Update="coverlet.collector" Version="3.2.0" />
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="17.4.1" />
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageReference Update="NUnit" Version="3.13.3" />
<PackageReference Update="NUnit.Analyzers" Version="3.5.0" />
<PackageReference Update="NUnit3TestAdapter" Version="4.3.1" />
<PackageReference Update="NUnit.Analyzers" Version="3.6.0" />
<PackageReference Update="NUnit3TestAdapter" Version="4.4.2" />
</ItemGroup>
</Project>

View File

@ -92,6 +92,7 @@ Use volumes to persist either the whole `/data` folder, or just `/data/data` if
* **Agent Configuration**
- `MAX_INSTANCES` is the number of instances that can be created.
- `MAX_MEMORY` is the maximum amount of RAM that can be distributed among all instances. Use a positive integer with an optional suffix 'M' for MB, or 'G' for GB. Examples: `4096M`, `16G`
- `MAX_CONCURRENT_BACKUP_COMPRESSION_TASKS` is how many backup compression tasks can run at the same time. Limiting concurrent compression tasks limits memory usage of compression, but it increases time between backups because the next backup is only scheduled once the current one completes. Default: `1`
* **Minecraft Configuration**
- `JAVA_SEARCH_PATH` is a path to a folder which will be searched for Java runtime installations. Linux default: `/usr/lib/jvm`
- `ALLOWED_SERVER_PORTS` is a comma-separated list of ports and port ranges that can be used as Minecraft Server ports. Example: `25565,25900,26000-27000`

View File

@ -15,7 +15,7 @@ public enum AuditLogEventType {
InstanceCommandExecuted
}
public static class AuditLogEventTypeExtensions {
static class AuditLogEventTypeExtensions {
private static readonly Dictionary<AuditLogEventType, AuditLogSubjectType> SubjectTypes = new () {
{ AuditLogEventType.AdministratorUserCreated, AuditLogSubjectType.User },
{ AuditLogEventType.AdministratorUserModified, AuditLogSubjectType.User },

View File

@ -10,7 +10,7 @@ public enum EventLogEventType {
InstanceBackupFailed,
}
internal static class EventLogEventTypeExtensions {
static class EventLogEventTypeExtensions {
private static readonly Dictionary<EventLogEventType, EventLogSubjectType> SubjectTypes = new () {
{ EventLogEventType.InstanceLaunchSucceded, EventLogSubjectType.Instance },
{ EventLogEventType.InstanceLaunchFailed, EventLogSubjectType.Instance },

View File

@ -11,14 +11,14 @@ using Serilog.Events;
namespace Phantom.Server.Rpc;
public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
public static async Task Launch(RpcConfiguration config, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) {
public static Task Launch(RpcConfiguration config, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) {
var socket = new ServerSocket();
var options = socket.Options;
options.CurveServer = true;
options.CurveCertificate = config.ServerCertificate;
await new RpcLauncher(config, socket, listenerFactory, cancellationToken).Launch();
return new RpcLauncher(config, socket, listenerFactory, cancellationToken).Launch();
}
private readonly RpcConfiguration config;

View File

@ -25,9 +25,7 @@ public sealed partial class AuditLog {
}
public Task AddUserRolesChangedEvent(IdentityUser user, List<string> addedToRoles, List<string> removedFromRoles) {
var extra = new Dictionary<string, object?> {
{ "username", user.UserName },
};
var extra = new Dictionary<string, object?>();
if (addedToRoles.Count > 0) {
extra["addedToRoles"] = addedToRoles;
@ -37,7 +35,7 @@ public sealed partial class AuditLog {
extra["removedFromRoles"] = removedFromRoles;
}
return AddItem(AuditLogEventType.UserDeleted, user.Id, extra);
return AddItem(AuditLogEventType.UserRolesChanged, user.Id, extra);
}
public Task AddUserDeletedEvent(IdentityUser user) {

View File

@ -2,8 +2,6 @@
using Microsoft.AspNetCore.Components.Authorization;
using Microsoft.AspNetCore.Components.Server;
using Microsoft.AspNetCore.Identity;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Phantom.Server.Web.Identity.Authentication;

View File

@ -1,10 +1,7 @@
using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Components.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Identity;
using Microsoft.Extensions.DependencyInjection;
using Phantom.Server.Database;
using Phantom.Server.Web.Identity.Authentication;
using Phantom.Server.Web.Identity.Authorization;

View File

@ -1,5 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.AspNetCore.Http;
using Phantom.Server.Web.Identity.Authentication;
using Phantom.Server.Web.Identity.Interfaces;

View File

@ -68,10 +68,14 @@ public sealed class Table<TRow, TKey> : IReadOnlyList<TRow>, IReadOnlyDictionary
}
}
public IEnumerator<TRow> GetEnumerator() {
public List<TRow>.Enumerator GetEnumerator() {
return rowList.GetEnumerator();
}
IEnumerator<TRow> IEnumerable<TRow>.GetEnumerator() {
return GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}

View File

@ -19,7 +19,7 @@ public abstract class MessageHandler<TListener> {
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
cancellationToken.ThrowIfCancellationRequested();
taskManager.Run("Handle message {Type}" + message.GetType().Name, async () => {
taskManager.Run("Handle message " + message.GetType().Name, async () => {
try {
await Handle<TMessage, TReply>(sequenceId, message);
} catch (Exception e) {

View File

@ -8,7 +8,7 @@ static class MessageSerializer {
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
public static byte[] Serialize<T>(T message) {
return MemoryPackSerializer.Serialize(typeof(T), message, SerializerOptions);
return MemoryPackSerializer.Serialize(message, SerializerOptions);
}
public static void Serialize<T>(IBufferWriter<byte> destination, T message) {

View File

@ -8,9 +8,18 @@ public abstract class CancellableBackgroundTask {
protected ILogger Logger { get; }
protected CancellationToken CancellationToken { get; }
private readonly TaskManager taskManager;
private readonly string taskName;
protected CancellableBackgroundTask(ILogger logger, TaskManager taskManager, string taskName) {
this.Logger = logger;
this.CancellationToken = cancellationTokenSource.Token;
this.taskManager = taskManager;
this.taskName = taskName;
}
protected void Start() {
taskManager.Run(taskName, Run);
}
@ -25,11 +34,14 @@ public abstract class CancellableBackgroundTask {
Logger.Fatal(e, "Caught exception in task.");
} finally {
cancellationTokenSource.Dispose();
Dispose();
Logger.Debug("Task stopped.");
}
}
protected abstract Task RunTask();
protected abstract void Dispose();
public void Stop() {
try {