1
0
mirror of https://github.com/chylex/Minecraft-Phantom-Panel.git synced 2024-11-26 01:42:53 +01:00

Compare commits

..

No commits in common. "9d9734d1fdabff1e3b17f7b3a8dda50c69944be0" and "dd57c442af945621526e484d62cf130dcf0dcdaf" have entirely different histories.

28 changed files with 87 additions and 134 deletions

View File

@ -7,7 +7,7 @@ public sealed class InstanceProcess : IDisposable {
public InstanceProperties InstanceProperties { get; }
public CancellableSemaphore BackupSemaphore { get; } = new (1, 1);
private readonly RingBuffer<string> outputBuffer = new (100);
private readonly RingBuffer<string> outputBuffer = new (10000);
private event EventHandler<string>? OutputEvent;
public event EventHandler? Ended;

View File

@ -79,9 +79,10 @@ public sealed class JavaRuntimeDiscovery {
WorkingDirectory = Path.GetDirectoryName(javaExecutablePath),
Arguments = "-XshowSettings:properties -version",
RedirectStandardInput = false,
RedirectStandardOutput = false,
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false
UseShellExecute = false,
CreateNoWindow = false
};
var process = new Process { StartInfo = startInfo };

View File

@ -10,7 +10,7 @@ public sealed partial class MinecraftServerExecutables {
private static readonly ILogger Logger = PhantomLogger.Create<MinecraftServerExecutables>();
[GeneratedRegex(@"[^a-zA-Z0-9_\-\.]", RegexOptions.Compiled)]
private static partial Regex SanitizePathRegex();
private static partial Regex VersionFolderSanitizeRegex();
private readonly string basePath;
private readonly Dictionary<string, MinecraftServerExecutableDownloader> runningDownloadersByVersion = new ();
@ -20,7 +20,7 @@ public sealed partial class MinecraftServerExecutables {
}
internal async Task<string?> DownloadAndGetPath(FileDownloadInfo? fileDownloadInfo, string minecraftVersion, EventHandler<DownloadProgressEventArgs> progressEventHandler, CancellationToken cancellationToken) {
string serverExecutableFolderPath = Path.Combine(basePath, SanitizePathRegex().IsMatch(minecraftVersion) ? SanitizePathRegex().Replace(minecraftVersion, "_") : minecraftVersion);
string serverExecutableFolderPath = Path.Combine(basePath, VersionFolderSanitizeRegex().Replace(minecraftVersion, "_"));
string serverExecutableFilePath = Path.Combine(serverExecutableFolderPath, "server.jar");
if (File.Exists(serverExecutableFilePath)) {

View File

@ -78,7 +78,7 @@ public sealed class ServerStatusProtocol {
return null;
}
string onlinePlayerCountStr = Encoding.BigEndianUnicode.GetString(messageBuffer.AsSpan((separator1 + 1)..(separator2 - 1)));
string onlinePlayerCountStr = Encoding.BigEndianUnicode.GetString(messageBuffer[(separator1 + 1)..(separator2 - 1)]);
if (!int.TryParse(onlinePlayerCountStr, out int onlinePlayerCount)) {
logger.Error("Could not parse online player count in response from server: {OnlinePlayerCount}.", onlinePlayerCountStr);
return null;

View File

@ -13,7 +13,7 @@ using Serilog.Events;
namespace Phantom.Agent.Rpc;
public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
public static Task Launch(RpcConfiguration config, AgentAuthToken authToken, AgentInfo agentInfo, Func<RpcServerConnection, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
public static async Task Launch(RpcConfiguration config, AgentAuthToken authToken, AgentInfo agentInfo, Func<RpcServerConnection, IMessageToAgentListener> listenerFactory, SemaphoreSlim disconnectSemaphore, CancellationToken receiveCancellationToken) {
var socket = new ClientSocket();
var options = socket.Options;
@ -21,7 +21,7 @@ public sealed class RpcLauncher : RpcRuntime<ClientSocket> {
options.CurveCertificate = new NetMQCertificate();
options.HelloMessage = MessageRegistries.ToServer.Write(new RegisterAgentMessage(authToken, agentInfo)).ToArray();
return new RpcLauncher(config, socket, agentInfo.Guid, listenerFactory, disconnectSemaphore, receiveCancellationToken).Launch();
await new RpcLauncher(config, socket, agentInfo.Guid, listenerFactory, disconnectSemaphore, receiveCancellationToken).Launch();
}
private readonly RpcConfiguration config;

View File

@ -1,3 +0,0 @@
namespace Phantom.Agent.Services;
public readonly record struct AgentServiceConfiguration(int MaxConcurrentCompressionTasks);

View File

@ -18,10 +18,10 @@ public sealed class AgentServices {
internal JavaRuntimeRepository JavaRuntimeRepository { get; }
internal InstanceSessionManager InstanceSessionManager { get; }
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders, AgentServiceConfiguration serviceConfiguration) {
public AgentServices(AgentInfo agentInfo, AgentFolders agentFolders) {
this.AgentFolders = agentFolders;
this.TaskManager = new TaskManager(PhantomLogger.Create<TaskManager, AgentServices>());
this.BackupManager = new BackupManager(agentFolders, serviceConfiguration.MaxConcurrentCompressionTasks);
this.BackupManager = new BackupManager(agentFolders);
this.JavaRuntimeRepository = new JavaRuntimeRepository();
this.InstanceSessionManager = new InstanceSessionManager(agentInfo, agentFolders, JavaRuntimeRepository, TaskManager, BackupManager);
}
@ -40,8 +40,6 @@ public sealed class AgentServices {
await TaskManager.Stop();
BackupManager.Dispose();
Logger.Information("Services stopped.");
}
}

View File

@ -5,19 +5,13 @@ using Serilog;
namespace Phantom.Agent.Services.Backups;
sealed class BackupManager : IDisposable {
sealed class BackupManager {
private readonly string destinationBasePath;
private readonly string temporaryBasePath;
private readonly SemaphoreSlim compressionSemaphore;
public BackupManager(AgentFolders agentFolders, int maxConcurrentCompressionTasks) {
public BackupManager(AgentFolders agentFolders) {
this.destinationBasePath = agentFolders.BackupsFolderPath;
this.temporaryBasePath = Path.Combine(agentFolders.TemporaryFolderPath, "backups");
this.compressionSemaphore = new SemaphoreSlim(maxConcurrentCompressionTasks, maxConcurrentCompressionTasks);
}
public void Dispose() {
compressionSemaphore.Dispose();
}
public async Task<BackupCreationResult> CreateBackup(string loggerName, InstanceProcess process, CancellationToken cancellationToken) {
@ -32,21 +26,23 @@ sealed class BackupManager : IDisposable {
}
try {
return await new BackupCreator(this, loggerName, process, cancellationToken).CreateBackup();
return await new BackupCreator(destinationBasePath, temporaryBasePath, loggerName, process, cancellationToken).CreateBackup();
} finally {
process.BackupSemaphore.Release();
}
}
private sealed class BackupCreator {
private readonly BackupManager manager;
private readonly string destinationBasePath;
private readonly string temporaryBasePath;
private readonly string loggerName;
private readonly ILogger logger;
private readonly InstanceProcess process;
private readonly CancellationToken cancellationToken;
public BackupCreator(BackupManager manager, string loggerName, InstanceProcess process, CancellationToken cancellationToken) {
this.manager = manager;
public BackupCreator(string destinationBasePath, string temporaryBasePath, string loggerName, InstanceProcess process, CancellationToken cancellationToken) {
this.destinationBasePath = destinationBasePath;
this.temporaryBasePath = temporaryBasePath;
this.loggerName = loggerName;
this.logger = PhantomLogger.Create<BackupManager>(loggerName);
this.process = process;
@ -76,7 +72,7 @@ sealed class BackupManager : IDisposable {
try {
await dispatcher.DisableAutomaticSaving();
await dispatcher.SaveAllChunks();
return await new BackupArchiver(manager.destinationBasePath, manager.temporaryBasePath, loggerName, process.InstanceProperties, cancellationToken).ArchiveWorld(resultBuilder);
return await new BackupArchiver(destinationBasePath, temporaryBasePath, loggerName, process.InstanceProperties, cancellationToken).ArchiveWorld(resultBuilder);
} catch (OperationCanceledException) {
resultBuilder.Kind = BackupCreationResultKind.BackupCancelled;
logger.Warning("Backup creation was cancelled.");
@ -98,19 +94,9 @@ sealed class BackupManager : IDisposable {
}
private async Task CompressWorldArchive(string filePath, BackupCreationResult.Builder resultBuilder) {
if (!await manager.compressionSemaphore.WaitAsync(TimeSpan.FromSeconds(1), cancellationToken)) {
logger.Information("Too many compression tasks running, waiting for one of them to complete...");
await manager.compressionSemaphore.WaitAsync(cancellationToken);
}
logger.Information("Compressing backup...");
try {
var compressedFilePath = await BackupCompressor.Compress(filePath, cancellationToken);
if (compressedFilePath == null) {
resultBuilder.Warnings |= BackupCreationWarnings.CouldNotCompressWorldArchive;
}
} finally {
manager.compressionSemaphore.Release();
var compressedFilePath = await BackupCompressor.Compress(filePath, cancellationToken);
if (compressedFilePath == null) {
resultBuilder.Warnings |= BackupCreationWarnings.CouldNotCompressWorldArchive;
}
}

View File

@ -27,7 +27,6 @@ sealed class BackupScheduler : CancellableBackgroundTask {
this.process = process;
this.serverPort = serverPort;
this.serverStatusProtocol = new ServerStatusProtocol(loggerName);
Start();
}
protected override async Task RunTask() {
@ -94,8 +93,4 @@ sealed class BackupScheduler : CancellableBackgroundTask {
Logger.Debug("Detected server output, signalling to check for online players again.");
}
}
protected override void Dispose() {
serverOutputWhileWaitingForOnlinePlayers.Dispose();
}
}

View File

@ -1,62 +1,36 @@
using System.Collections.Immutable;
using System.Threading.Channels;
using Phantom.Agent.Rpc;
using Phantom.Common.Logging;
using Phantom.Common.Messages.ToServer;
using Phantom.Utils.Collections;
using Phantom.Utils.Runtime;
namespace Phantom.Agent.Services.Instances;
sealed class InstanceLogSender : CancellableBackgroundTask {
private static readonly BoundedChannelOptions BufferOptions = new (capacity: 64) {
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.DropNewest
};
private static readonly TimeSpan SendDelay = TimeSpan.FromMilliseconds(200);
private readonly Guid instanceGuid;
private readonly Channel<string> outputChannel;
private int droppedLinesSinceLastSend;
private readonly SemaphoreSlim semaphore = new (1, 1);
private readonly RingBuffer<string> buffer = new (1000);
public InstanceLogSender(TaskManager taskManager, Guid instanceGuid, string loggerName) : base(PhantomLogger.Create<InstanceLogSender>(loggerName), taskManager, "Instance log sender for " + loggerName) {
this.instanceGuid = instanceGuid;
this.outputChannel = Channel.CreateBounded<string>(BufferOptions, OnLineDropped);
Start();
}
protected override async Task RunTask() {
var lineReader = outputChannel.Reader;
var lineBuilder = ImmutableArray.CreateBuilder<string>();
try {
while (await lineReader.WaitToReadAsync(CancellationToken)) {
while (!CancellationToken.IsCancellationRequested) {
await SendOutputToServer(await DequeueOrThrow());
await Task.Delay(SendDelay, CancellationToken);
await SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
}
} catch (OperationCanceledException) {
// Ignore.
}
// Flush remaining lines.
await SendOutputToServer(ReadLinesFromChannel(lineReader, lineBuilder));
}
private ImmutableArray<string> ReadLinesFromChannel(ChannelReader<string> reader, ImmutableArray<string>.Builder builder) {
builder.Clear();
while (reader.TryRead(out string? line)) {
builder.Add(line);
}
int droppedLines = Interlocked.Exchange(ref droppedLinesSinceLastSend, 0);
if (droppedLines > 0) {
builder.Add($"Dropped {droppedLines} {(droppedLines == 1 ? "line" : "lines")} due to buffer overflow.");
}
return builder.ToImmutable();
await SendOutputToServer(DequeueWithoutSemaphore());
}
private async Task SendOutputToServer(ImmutableArray<string> lines) {
@ -65,18 +39,33 @@ sealed class InstanceLogSender : CancellableBackgroundTask {
}
}
private void OnLineDropped(string line) {
Logger.Warning("Buffer is full, dropped line: {Line}", line);
Interlocked.Increment(ref droppedLinesSinceLastSend);
private ImmutableArray<string> DequeueWithoutSemaphore() {
ImmutableArray<string> lines = buffer.Count > 0 ? buffer.EnumerateLast(uint.MaxValue).ToImmutableArray() : ImmutableArray<string>.Empty;
buffer.Clear();
return lines;
}
private async Task<ImmutableArray<string>> DequeueOrThrow() {
await semaphore.WaitAsync(CancellationToken);
try {
return DequeueWithoutSemaphore();
} finally {
semaphore.Release();
}
}
public void Enqueue(string line) {
outputChannel.Writer.TryWrite(line);
}
try {
semaphore.Wait(CancellationToken);
} catch (Exception) {
return;
}
protected override void Dispose() {
if (!outputChannel.Writer.TryComplete()) {
Logger.Error("Could not mark channel as completed.");
try {
buffer.Add(line);
} finally {
semaphore.Release();
}
}
}

View File

@ -17,15 +17,13 @@ PosixSignals.RegisterCancellation(shutdownCancellationTokenSource, static () =>
PhantomLogger.Root.InformationHeading("Stopping Phantom Panel agent...");
});
ThreadPool.SetMinThreads(workerThreads: 2, completionPortThreads: 1);
try {
var fullVersion = AssemblyAttributes.GetFullVersion(Assembly.GetExecutingAssembly());
PhantomLogger.Root.InformationHeading("Initializing Phantom Panel agent...");
PhantomLogger.Root.Information("Agent version: {Version}", fullVersion);
var (serverHost, serverPort, javaSearchPath, agentKeyToken, agentKeyFilePath, agentName, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts, maxConcurrentBackupCompressionTasks) = Variables.LoadOrExit();
var (serverHost, serverPort, javaSearchPath, agentKeyToken, agentKeyFilePath, agentName, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts) = Variables.LoadOrExit();
var agentKey = await AgentKey.Load(agentKeyToken, agentKeyFilePath);
if (agentKey == null) {
@ -44,7 +42,7 @@ try {
var (serverCertificate, agentToken) = agentKey.Value;
var agentInfo = new AgentInfo(agentGuid.Value, agentName, ProtocolVersion, fullVersion, maxInstances, maxMemory, allowedServerPorts, allowedRconPorts);
var agentServices = new AgentServices(agentInfo, folders, new AgentServiceConfiguration(maxConcurrentBackupCompressionTasks));
var agentServices = new AgentServices(agentInfo, folders);
MessageListener MessageListenerFactory(RpcServerConnection connection) {
return new MessageListener(connection, agentServices, shutdownCancellationTokenSource);

View File

@ -15,8 +15,7 @@ sealed record Variables(
ushort MaxInstances,
RamAllocationUnits MaxMemory,
AllowedPorts AllowedServerPorts,
AllowedPorts AllowedRconPorts,
ushort MaxConcurrentBackupCompressionTasks
AllowedPorts AllowedRconPorts
) {
private static Variables LoadOrThrow() {
var (agentKeyToken, agentKeyFilePath) = EnvironmentVariables.GetEitherString("AGENT_KEY", "AGENT_KEY_FILE").Require;
@ -32,8 +31,7 @@ sealed record Variables(
(ushort) EnvironmentVariables.GetInteger("MAX_INSTANCES", min: 1, max: 10000).Require,
EnvironmentVariables.GetString("MAX_MEMORY").MapParse(RamAllocationUnits.FromString).Require,
EnvironmentVariables.GetString("ALLOWED_SERVER_PORTS").MapParse(AllowedPorts.FromString).Require,
EnvironmentVariables.GetString("ALLOWED_RCON_PORTS").MapParse(AllowedPorts.FromString).Require,
(ushort) EnvironmentVariables.GetInteger("MAX_CONCURRENT_BACKUP_COMPRESSION_TASKS", min: 1, max: 10000).WithDefault(1)
EnvironmentVariables.GetString("ALLOWED_RCON_PORTS").MapParse(AllowedPorts.FromString).Require
);
}

View File

@ -53,6 +53,6 @@ public sealed partial class AllowedPorts {
}
public static AllowedPorts FromString(string definitions) {
return FromString(definitions.AsSpan());
return FromString((ReadOnlySpan<char>) definitions);
}
}

View File

@ -98,6 +98,6 @@ public readonly partial record struct RamAllocationUnits(
/// </summary>
/// <exception cref="ArgumentOutOfRangeException">If the <paramref name="definition"/> is in the incorrect format, or the value cannot be converted via <see cref="FromMegabytes"/>.</exception>
public static RamAllocationUnits FromString(string definition) {
return FromString(definition.AsSpan());
return FromString((ReadOnlySpan<char>) definition);
}
}

View File

@ -5,7 +5,7 @@ using Phantom.Utils.Rpc.Message;
namespace Phantom.Common.Messages.ToServer;
[MemoryPackable]
public sealed partial record ReportAgentStatusMessage(
public partial record ReportAgentStatusMessage(
[property: MemoryPackOrder(0)] int RunningInstanceCount,
[property: MemoryPackOrder(1)] RamAllocationUnits RunningInstanceMemory
) : IMessageToServer {

View File

@ -1,11 +1,11 @@
<Project>
<ItemGroup>
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="7.0.3" />
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="7.0.3" />
<PackageReference Update="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="7.0.3" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="7.0.3" />
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.3" />
<PackageReference Update="Microsoft.AspNetCore.Components.Authorization" Version="7.0.1" />
<PackageReference Update="Microsoft.AspNetCore.Components.Web" Version="7.0.1" />
<PackageReference Update="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="7.0.1" />
<PackageReference Update="Microsoft.EntityFrameworkCore.Tools" Version="7.0.1" />
<PackageReference Update="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.1" />
</ItemGroup>
<ItemGroup>
@ -13,7 +13,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Update="MemoryPack" Version="1.9.13" />
<PackageReference Update="MemoryPack" Version="1.9.7" />
<PackageReference Update="NetMQ" Version="4.0.1.10" />
</ItemGroup>
@ -26,10 +26,10 @@
<ItemGroup>
<PackageReference Update="coverlet.collector" Version="3.2.0" />
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="17.4.1" />
<PackageReference Update="NUnit" Version="3.13.3" />
<PackageReference Update="NUnit.Analyzers" Version="3.6.0" />
<PackageReference Update="NUnit3TestAdapter" Version="4.4.2" />
<PackageReference Update="NUnit.Analyzers" Version="3.5.0" />
<PackageReference Update="NUnit3TestAdapter" Version="4.3.1" />
</ItemGroup>
</Project>

View File

@ -92,7 +92,6 @@ Use volumes to persist either the whole `/data` folder, or just `/data/data` if
* **Agent Configuration**
- `MAX_INSTANCES` is the number of instances that can be created.
- `MAX_MEMORY` is the maximum amount of RAM that can be distributed among all instances. Use a positive integer with an optional suffix 'M' for MB, or 'G' for GB. Examples: `4096M`, `16G`
- `MAX_CONCURRENT_BACKUP_COMPRESSION_TASKS` is how many backup compression tasks can run at the same time. Limiting concurrent compression tasks limits memory usage of compression, but it increases time between backups because the next backup is only scheduled once the current one completes. Default: `1`
* **Minecraft Configuration**
- `JAVA_SEARCH_PATH` is a path to a folder which will be searched for Java runtime installations. Linux default: `/usr/lib/jvm`
- `ALLOWED_SERVER_PORTS` is a comma-separated list of ports and port ranges that can be used as Minecraft Server ports. Example: `25565,25900,26000-27000`

View File

@ -15,7 +15,7 @@ public enum AuditLogEventType {
InstanceCommandExecuted
}
static class AuditLogEventTypeExtensions {
public static class AuditLogEventTypeExtensions {
private static readonly Dictionary<AuditLogEventType, AuditLogSubjectType> SubjectTypes = new () {
{ AuditLogEventType.AdministratorUserCreated, AuditLogSubjectType.User },
{ AuditLogEventType.AdministratorUserModified, AuditLogSubjectType.User },

View File

@ -10,7 +10,7 @@ public enum EventLogEventType {
InstanceBackupFailed,
}
static class EventLogEventTypeExtensions {
internal static class EventLogEventTypeExtensions {
private static readonly Dictionary<EventLogEventType, EventLogSubjectType> SubjectTypes = new () {
{ EventLogEventType.InstanceLaunchSucceded, EventLogSubjectType.Instance },
{ EventLogEventType.InstanceLaunchFailed, EventLogSubjectType.Instance },

View File

@ -11,14 +11,14 @@ using Serilog.Events;
namespace Phantom.Server.Rpc;
public sealed class RpcLauncher : RpcRuntime<ServerSocket> {
public static Task Launch(RpcConfiguration config, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) {
public static async Task Launch(RpcConfiguration config, Func<RpcClientConnection, IMessageToServerListener> listenerFactory, CancellationToken cancellationToken) {
var socket = new ServerSocket();
var options = socket.Options;
options.CurveServer = true;
options.CurveCertificate = config.ServerCertificate;
return new RpcLauncher(config, socket, listenerFactory, cancellationToken).Launch();
await new RpcLauncher(config, socket, listenerFactory, cancellationToken).Launch();
}
private readonly RpcConfiguration config;

View File

@ -25,7 +25,9 @@ public sealed partial class AuditLog {
}
public Task AddUserRolesChangedEvent(IdentityUser user, List<string> addedToRoles, List<string> removedFromRoles) {
var extra = new Dictionary<string, object?>();
var extra = new Dictionary<string, object?> {
{ "username", user.UserName },
};
if (addedToRoles.Count > 0) {
extra["addedToRoles"] = addedToRoles;
@ -35,7 +37,7 @@ public sealed partial class AuditLog {
extra["removedFromRoles"] = removedFromRoles;
}
return AddItem(AuditLogEventType.UserRolesChanged, user.Id, extra);
return AddItem(AuditLogEventType.UserDeleted, user.Id, extra);
}
public Task AddUserDeletedEvent(IdentityUser user) {

View File

@ -2,6 +2,8 @@
using Microsoft.AspNetCore.Components.Authorization;
using Microsoft.AspNetCore.Components.Server;
using Microsoft.AspNetCore.Identity;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Phantom.Server.Web.Identity.Authentication;

View File

@ -1,7 +1,10 @@
using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Components.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Identity;
using Microsoft.Extensions.DependencyInjection;
using Phantom.Server.Database;
using Phantom.Server.Web.Identity.Authentication;
using Phantom.Server.Web.Identity.Authorization;

View File

@ -1,4 +1,5 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.AspNetCore.Http;
using Phantom.Server.Web.Identity.Authentication;
using Phantom.Server.Web.Identity.Interfaces;

View File

@ -68,14 +68,10 @@ public sealed class Table<TRow, TKey> : IReadOnlyList<TRow>, IReadOnlyDictionary
}
}
public List<TRow>.Enumerator GetEnumerator() {
public IEnumerator<TRow> GetEnumerator() {
return rowList.GetEnumerator();
}
IEnumerator<TRow> IEnumerable<TRow>.GetEnumerator() {
return GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}

View File

@ -19,7 +19,7 @@ public abstract class MessageHandler<TListener> {
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
cancellationToken.ThrowIfCancellationRequested();
taskManager.Run("Handle message " + message.GetType().Name, async () => {
taskManager.Run("Handle message {Type}" + message.GetType().Name, async () => {
try {
await Handle<TMessage, TReply>(sequenceId, message);
} catch (Exception e) {

View File

@ -8,7 +8,7 @@ static class MessageSerializer {
private static readonly MemoryPackSerializerOptions SerializerOptions = MemoryPackSerializerOptions.Utf8;
public static byte[] Serialize<T>(T message) {
return MemoryPackSerializer.Serialize(message, SerializerOptions);
return MemoryPackSerializer.Serialize(typeof(T), message, SerializerOptions);
}
public static void Serialize<T>(IBufferWriter<byte> destination, T message) {

View File

@ -8,18 +8,9 @@ public abstract class CancellableBackgroundTask {
protected ILogger Logger { get; }
protected CancellationToken CancellationToken { get; }
private readonly TaskManager taskManager;
private readonly string taskName;
protected CancellableBackgroundTask(ILogger logger, TaskManager taskManager, string taskName) {
this.Logger = logger;
this.CancellationToken = cancellationTokenSource.Token;
this.taskManager = taskManager;
this.taskName = taskName;
}
protected void Start() {
taskManager.Run(taskName, Run);
}
@ -34,15 +25,12 @@ public abstract class CancellableBackgroundTask {
Logger.Fatal(e, "Caught exception in task.");
} finally {
cancellationTokenSource.Dispose();
Dispose();
Logger.Debug("Task stopped.");
}
}
protected abstract Task RunTask();
protected abstract void Dispose();
public void Stop() {
try {
cancellationTokenSource.Cancel();