mirror of
https://github.com/chylex/Minecraft-Phantom-Panel.git
synced 2024-11-25 16:42:54 +01:00
Compare commits
2 Commits
c8a2a539e8
...
3c10e1a8f9
Author | SHA1 | Date | |
---|---|---|---|
3c10e1a8f9 | |||
f4aec6f11d |
@ -15,29 +15,24 @@ public sealed class RpcServerConnection {
|
|||||||
this.replyTracker = replyTracker;
|
this.replyTracker = replyTracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] WriteBytes<TMessage, TReply>(TMessage message) where TMessage : IMessageToServer<TReply> {
|
|
||||||
return MessageRegistries.ToServer.Write<TMessage, TReply>(message).ToArray();
|
|
||||||
}
|
|
||||||
|
|
||||||
internal async Task Send<TMessage>(TMessage message) where TMessage : IMessageToServer {
|
internal async Task Send<TMessage>(TMessage message) where TMessage : IMessageToServer {
|
||||||
var bytes = WriteBytes<TMessage, NoReply>(message);
|
var bytes = MessageRegistries.ToServer.Write(message).ToArray();
|
||||||
if (bytes.Length > 0) {
|
if (bytes.Length > 0) {
|
||||||
await socket.SendAsync(bytes);
|
await socket.SendAsync(bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internal async Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
|
internal async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
|
||||||
var sequenceId = replyTracker.RegisterReply();
|
var sequenceId = replyTracker.RegisterReply();
|
||||||
var message = messageFactory(sequenceId);
|
|
||||||
|
|
||||||
var bytes = WriteBytes<TMessage, TReply>(message);
|
var bytes = MessageRegistries.ToServer.Write<TMessage, TReply>(sequenceId, message).ToArray();
|
||||||
if (bytes.Length == 0) {
|
if (bytes.Length == 0) {
|
||||||
replyTracker.ForgetReply(sequenceId);
|
replyTracker.ForgetReply(sequenceId);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
await socket.SendAsync(bytes);
|
await socket.SendAsync(bytes);
|
||||||
return await replyTracker.WaitForReply<TReply>(message.SequenceId, waitForReplyTime, cancellationToken);
|
return await replyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Receive(ReplyMessage message) {
|
public void Receive(ReplyMessage message) {
|
||||||
|
@ -28,7 +28,7 @@ public static class ServerMessaging {
|
|||||||
return CurrentConnectionOrThrow.Send(message);
|
return CurrentConnectionOrThrow.Send(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
|
public static Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToServer<TReply> where TReply : class {
|
||||||
return CurrentConnectionOrThrow.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
|
return CurrentConnectionOrThrow.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,10 +85,10 @@ sealed class Instance : IDisposable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<LaunchInstanceResult> Launch(CancellationToken cancellationToken) {
|
public async Task<LaunchInstanceResult> Launch(CancellationToken shutdownCancellationToken) {
|
||||||
await stateTransitioningActionSemaphore.WaitAsync(cancellationToken);
|
await stateTransitioningActionSemaphore.WaitAsync(shutdownCancellationToken);
|
||||||
try {
|
try {
|
||||||
return TransitionStateAndReturn(currentState.Launch(new InstanceContextImpl(this)));
|
return TransitionStateAndReturn(currentState.Launch(new InstanceContextImpl(this, shutdownCancellationToken)));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.Error(e, "Caught exception while launching instance.");
|
logger.Error(e, "Caught exception while launching instance.");
|
||||||
return LaunchInstanceResult.UnknownError;
|
return LaunchInstanceResult.UnknownError;
|
||||||
@ -126,10 +126,13 @@ sealed class Instance : IDisposable {
|
|||||||
|
|
||||||
private sealed class InstanceContextImpl : InstanceContext {
|
private sealed class InstanceContextImpl : InstanceContext {
|
||||||
private readonly Instance instance;
|
private readonly Instance instance;
|
||||||
|
private readonly CancellationToken shutdownCancellationToken;
|
||||||
|
|
||||||
private int statusUpdateCounter;
|
private int statusUpdateCounter;
|
||||||
|
|
||||||
public InstanceContextImpl(Instance instance) : base(instance.Configuration, instance.Launcher) {
|
public InstanceContextImpl(Instance instance, CancellationToken shutdownCancellationToken) : base(instance.Configuration, instance.Launcher) {
|
||||||
this.instance = instance;
|
this.instance = instance;
|
||||||
|
this.shutdownCancellationToken = shutdownCancellationToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
public override LaunchServices LaunchServices => instance.launchServices;
|
public override LaunchServices LaunchServices => instance.launchServices;
|
||||||
@ -148,10 +151,20 @@ sealed class Instance : IDisposable {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public override void TransitionState(Func<IInstanceState> newState) {
|
public override void TransitionState(Func<(IInstanceState, IInstanceStatus?)> newStateAndStatus) {
|
||||||
instance.stateTransitioningActionSemaphore.Wait();
|
instance.stateTransitioningActionSemaphore.Wait(CancellationToken.None);
|
||||||
try {
|
try {
|
||||||
instance.TransitionState(newState());
|
var (state, status) = newStateAndStatus();
|
||||||
|
if (state is not InstanceNotRunningState && shutdownCancellationToken.IsCancellationRequested) {
|
||||||
|
instance.logger.Verbose("Cancelled state transition to {State} due to Agent shutdown.", state.GetType().Name);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status != null) {
|
||||||
|
ReportStatus(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
instance.TransitionState(state);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
instance.logger.Error(e, "Caught exception during state transition.");
|
instance.logger.Error(e, "Caught exception during state transition.");
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -20,9 +20,9 @@ abstract class InstanceContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public abstract void ReportStatus(IInstanceStatus newStatus);
|
public abstract void ReportStatus(IInstanceStatus newStatus);
|
||||||
public abstract void TransitionState(Func<IInstanceState> newState);
|
public abstract void TransitionState(Func<(IInstanceState, IInstanceStatus?)> newStateAndStatus);
|
||||||
|
|
||||||
public void TransitionState(IInstanceState newState) {
|
public void TransitionState(IInstanceState newState, IInstanceStatus? newStatus = null) {
|
||||||
TransitionState(() => newState);
|
TransitionState(() => (newState, newStatus));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,36 +41,36 @@ sealed class InstanceSessionManager : IDisposable {
|
|||||||
this.shutdownCancellationToken = shutdownCancellationTokenSource.Token;
|
this.shutdownCancellationToken = shutdownCancellationTokenSource.Token;
|
||||||
}
|
}
|
||||||
|
|
||||||
[SuppressMessage("ReSharper", "ConvertIfStatementToReturnStatement")]
|
private async Task<InstanceActionResult<T>> AcquireSemaphoreAndRun<T>(Func<Task<InstanceActionResult<T>>> func) {
|
||||||
private async Task<InstanceActionResult<T>> AcquireSemaphoreAndRunWithInstance<T>(Guid instanceGuid, Func<Instance, Task<T>> func) {
|
|
||||||
try {
|
try {
|
||||||
await semaphore.WaitAsync(shutdownCancellationToken);
|
await semaphore.WaitAsync(shutdownCancellationToken);
|
||||||
|
|
||||||
|
try {
|
||||||
|
return await func();
|
||||||
|
} finally {
|
||||||
|
semaphore.Release();
|
||||||
|
}
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
return InstanceActionResult.General<T>(InstanceActionGeneralResult.AgentShuttingDown);
|
return InstanceActionResult.General<T>(InstanceActionGeneralResult.AgentShuttingDown);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
[SuppressMessage("ReSharper", "ConvertIfStatementToReturnStatement")]
|
||||||
if (!instances.TryGetValue(instanceGuid, out var instance)) {
|
private Task<InstanceActionResult<T>> AcquireSemaphoreAndRunWithInstance<T>(Guid instanceGuid, Func<Instance, Task<T>> func) {
|
||||||
return InstanceActionResult.General<T>(InstanceActionGeneralResult.InstanceDoesNotExist);
|
return AcquireSemaphoreAndRun(async () => {
|
||||||
}
|
if (instances.TryGetValue(instanceGuid, out var instance)) {
|
||||||
else {
|
|
||||||
return InstanceActionResult.Concrete(await func(instance));
|
return InstanceActionResult.Concrete(await func(instance));
|
||||||
}
|
}
|
||||||
} finally {
|
else {
|
||||||
semaphore.Release();
|
return InstanceActionResult.General<T>(InstanceActionGeneralResult.InstanceDoesNotExist);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<InstanceActionResult<ConfigureInstanceResult>> Configure(InstanceConfiguration configuration) {
|
public async Task<InstanceActionResult<ConfigureInstanceResult>> Configure(InstanceConfiguration configuration) {
|
||||||
try {
|
return await AcquireSemaphoreAndRun(async () => {
|
||||||
await semaphore.WaitAsync(shutdownCancellationToken);
|
var instanceGuid = configuration.InstanceGuid;
|
||||||
} catch (OperationCanceledException) {
|
|
||||||
return InstanceActionResult.General<ConfigureInstanceResult>(InstanceActionGeneralResult.AgentShuttingDown);
|
|
||||||
}
|
|
||||||
|
|
||||||
var instanceGuid = configuration.InstanceGuid;
|
|
||||||
|
|
||||||
try {
|
|
||||||
var otherInstances = instances.Values.Where(inst => inst.Configuration.InstanceGuid != instanceGuid).ToArray();
|
var otherInstances = instances.Values.Where(inst => inst.Configuration.InstanceGuid != instanceGuid).ToArray();
|
||||||
if (otherInstances.Length + 1 > agentInfo.MaxInstances) {
|
if (otherInstances.Length + 1 > agentInfo.MaxInstances) {
|
||||||
return InstanceActionResult.Concrete(ConfigureInstanceResult.InstanceLimitExceeded);
|
return InstanceActionResult.Concrete(ConfigureInstanceResult.InstanceLimitExceeded);
|
||||||
@ -115,9 +115,7 @@ sealed class InstanceSessionManager : IDisposable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return InstanceActionResult.Concrete(ConfigureInstanceResult.Success);
|
return InstanceActionResult.Concrete(ConfigureInstanceResult.Success);
|
||||||
} finally {
|
});
|
||||||
semaphore.Release();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<InstanceActionResult<LaunchInstanceResult>> Launch(Guid instanceGuid) {
|
public Task<InstanceActionResult<LaunchInstanceResult>> Launch(Guid instanceGuid) {
|
||||||
|
@ -66,11 +66,10 @@ sealed class InstanceLaunchingState : IInstanceState, IDisposable {
|
|||||||
context.TransitionState(() => {
|
context.TransitionState(() => {
|
||||||
if (cancellationTokenSource.IsCancellationRequested) {
|
if (cancellationTokenSource.IsCancellationRequested) {
|
||||||
context.PortManager.Release(context.Configuration);
|
context.PortManager.Release(context.Configuration);
|
||||||
context.ReportStatus(InstanceStatus.NotRunning);
|
return (new InstanceNotRunningState(), InstanceStatus.NotRunning);
|
||||||
return new InstanceNotRunningState();
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return new InstanceRunningState(context, task.Result);
|
return (new InstanceRunningState(context, task.Result), null);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -30,8 +30,7 @@ sealed class InstanceRunningState : IInstanceState {
|
|||||||
if (session.HasEnded) {
|
if (session.HasEnded) {
|
||||||
if (sessionObjects.Dispose()) {
|
if (sessionObjects.Dispose()) {
|
||||||
context.Logger.Warning("Session ended immediately after it was started.");
|
context.Logger.Warning("Session ended immediately after it was started.");
|
||||||
context.ReportStatus(InstanceStatus.Failed(InstanceLaunchFailReason.UnknownError));
|
context.LaunchServices.TaskManager.Run(() => context.TransitionState(new InstanceNotRunningState(), InstanceStatus.Failed(InstanceLaunchFailReason.UnknownError)));
|
||||||
context.LaunchServices.TaskManager.Run(() => context.TransitionState(new InstanceNotRunningState()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@ -52,13 +51,11 @@ sealed class InstanceRunningState : IInstanceState {
|
|||||||
|
|
||||||
if (isStopping) {
|
if (isStopping) {
|
||||||
context.Logger.Information("Session ended.");
|
context.Logger.Information("Session ended.");
|
||||||
context.ReportStatus(InstanceStatus.NotRunning);
|
context.TransitionState(new InstanceNotRunningState(), InstanceStatus.NotRunning);
|
||||||
context.TransitionState(new InstanceNotRunningState());
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
context.Logger.Information("Session ended unexpectedly, restarting...");
|
context.Logger.Information("Session ended unexpectedly, restarting...");
|
||||||
context.ReportStatus(InstanceStatus.Restarting);
|
context.TransitionState(new InstanceLaunchingState(context), InstanceStatus.Restarting);
|
||||||
context.TransitionState(new InstanceLaunchingState(context));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using Phantom.Agent.Minecraft.Command;
|
using System.Diagnostics;
|
||||||
|
using Phantom.Agent.Minecraft.Command;
|
||||||
using Phantom.Agent.Minecraft.Instance;
|
using Phantom.Agent.Minecraft.Instance;
|
||||||
using Phantom.Common.Data.Instance;
|
using Phantom.Common.Data.Instance;
|
||||||
using Phantom.Common.Data.Minecraft;
|
using Phantom.Common.Data.Minecraft;
|
||||||
@ -32,26 +33,27 @@ sealed class InstanceStoppingState : IInstanceState, IDisposable {
|
|||||||
await DoWaitForSessionToEnd();
|
await DoWaitForSessionToEnd();
|
||||||
} finally {
|
} finally {
|
||||||
context.Logger.Information("Session stopped.");
|
context.Logger.Information("Session stopped.");
|
||||||
context.ReportStatus(InstanceStatus.NotRunning);
|
context.TransitionState(new InstanceNotRunningState(), InstanceStatus.NotRunning);
|
||||||
context.TransitionState(new InstanceNotRunningState());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task DoSendStopCommand() {
|
private async Task DoSendStopCommand() {
|
||||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
try {
|
try {
|
||||||
await session.SendCommand(MinecraftCommand.Stop, cts.Token);
|
await session.SendCommand(MinecraftCommand.Stop, timeout.Token);
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
// ignore
|
// ignore
|
||||||
|
} catch (ObjectDisposedException e) when (e.ObjectName == typeof(Process).FullName && session.HasEnded) {
|
||||||
|
// ignore
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
context.Logger.Warning(e, "Caught exception while sending stop command.");
|
context.Logger.Warning(e, "Caught exception while sending stop command.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task DoWaitForSessionToEnd() {
|
private async Task DoWaitForSessionToEnd() {
|
||||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(55));
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(55));
|
||||||
try {
|
try {
|
||||||
await session.WaitForExit(cts.Token);
|
await session.WaitForExit(timeout.Token);
|
||||||
} catch (OperationCanceledException) {
|
} catch (OperationCanceledException) {
|
||||||
try {
|
try {
|
||||||
context.Logger.Warning("Waiting timed out, killing session...");
|
context.Logger.Warning("Waiting timed out, killing session...");
|
||||||
|
@ -4,6 +4,4 @@ namespace Phantom.Common.Messages;
|
|||||||
|
|
||||||
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {}
|
public interface IMessageToAgent<TReply> : IMessage<IMessageToAgentListener, TReply> {}
|
||||||
|
|
||||||
public interface IMessageToAgent : IMessageToAgent<NoReply> {
|
public interface IMessageToAgent : IMessageToAgent<NoReply> {}
|
||||||
uint IMessage<IMessageToAgentListener, NoReply>.SequenceId => 0;
|
|
||||||
}
|
|
||||||
|
@ -1,9 +1,7 @@
|
|||||||
using Phantom.Utils.Rpc.Message;
|
using Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
namespace Phantom.Common.Messages;
|
namespace Phantom.Common.Messages;
|
||||||
|
|
||||||
public interface IMessageToServer<TReply> : IMessage<IMessageToServerListener, TReply> {}
|
public interface IMessageToServer<TReply> : IMessage<IMessageToServerListener, TReply> {}
|
||||||
|
|
||||||
public interface IMessageToServer : IMessageToServer<NoReply> {
|
public interface IMessageToServer : IMessageToServer<NoReply> {}
|
||||||
uint IMessage<IMessageToServerListener, NoReply>.SequenceId => 0;
|
|
||||||
}
|
|
||||||
|
@ -12,20 +12,20 @@ public static class MessageRegistries {
|
|||||||
public static MessageRegistry<IMessageToServerListener> ToServer { get; } = new (PhantomLogger.Create("MessageRegistry:ToServer"));
|
public static MessageRegistry<IMessageToServerListener> ToServer { get; } = new (PhantomLogger.Create("MessageRegistry:ToServer"));
|
||||||
|
|
||||||
static MessageRegistries() {
|
static MessageRegistries() {
|
||||||
ToAgent.Add<RegisterAgentSuccessMessage, NoReply>(0);
|
ToAgent.Add<RegisterAgentSuccessMessage>(0);
|
||||||
ToAgent.Add<RegisterAgentFailureMessage, NoReply>(1);
|
ToAgent.Add<RegisterAgentFailureMessage>(1);
|
||||||
ToAgent.Add<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(2);
|
ToAgent.Add<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(2);
|
||||||
ToAgent.Add<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(3);
|
ToAgent.Add<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(3);
|
||||||
ToAgent.Add<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(4);
|
ToAgent.Add<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(4);
|
||||||
ToAgent.Add<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(5);
|
ToAgent.Add<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(5);
|
||||||
ToAgent.Add<ReplyMessage, NoReply>(127);
|
ToAgent.Add<ReplyMessage>(127);
|
||||||
|
|
||||||
ToServer.Add<RegisterAgentMessage, NoReply>(0);
|
ToServer.Add<RegisterAgentMessage>(0);
|
||||||
ToServer.Add<UnregisterAgentMessage, NoReply>(1);
|
ToServer.Add<UnregisterAgentMessage>(1);
|
||||||
ToServer.Add<AgentIsAliveMessage, NoReply>(2);
|
ToServer.Add<AgentIsAliveMessage>(2);
|
||||||
ToServer.Add<AdvertiseJavaRuntimesMessage, NoReply>(3);
|
ToServer.Add<AdvertiseJavaRuntimesMessage>(3);
|
||||||
ToServer.Add<ReportInstanceStatusMessage, NoReply>(4);
|
ToServer.Add<ReportInstanceStatusMessage>(4);
|
||||||
ToServer.Add<InstanceOutputMessage, NoReply>(5);
|
ToServer.Add<InstanceOutputMessage>(5);
|
||||||
ToServer.Add<ReplyMessage, NoReply>(127);
|
ToServer.Add<ReplyMessage>(127);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,8 +6,7 @@ namespace Phantom.Common.Messages.ToAgent;
|
|||||||
|
|
||||||
[MemoryPackable]
|
[MemoryPackable]
|
||||||
public sealed partial record ConfigureInstanceMessage(
|
public sealed partial record ConfigureInstanceMessage(
|
||||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
[property: MemoryPackOrder(0)] InstanceConfiguration Configuration
|
||||||
[property: MemoryPackOrder(1)] InstanceConfiguration Configuration
|
|
||||||
) : IMessageToAgent<InstanceActionResult<ConfigureInstanceResult>> {
|
) : IMessageToAgent<InstanceActionResult<ConfigureInstanceResult>> {
|
||||||
public Task<InstanceActionResult<ConfigureInstanceResult>> Accept(IMessageToAgentListener listener) {
|
public Task<InstanceActionResult<ConfigureInstanceResult>> Accept(IMessageToAgentListener listener) {
|
||||||
return listener.HandleConfigureInstance(this);
|
return listener.HandleConfigureInstance(this);
|
||||||
|
@ -5,8 +5,7 @@ namespace Phantom.Common.Messages.ToAgent;
|
|||||||
|
|
||||||
[MemoryPackable]
|
[MemoryPackable]
|
||||||
public sealed partial record LaunchInstanceMessage(
|
public sealed partial record LaunchInstanceMessage(
|
||||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
[property: MemoryPackOrder(0)] Guid InstanceGuid
|
||||||
[property: MemoryPackOrder(1)] Guid InstanceGuid
|
|
||||||
) : IMessageToAgent<InstanceActionResult<LaunchInstanceResult>> {
|
) : IMessageToAgent<InstanceActionResult<LaunchInstanceResult>> {
|
||||||
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToAgentListener listener) {
|
public Task<InstanceActionResult<LaunchInstanceResult>> Accept(IMessageToAgentListener listener) {
|
||||||
return listener.HandleLaunchInstance(this);
|
return listener.HandleLaunchInstance(this);
|
||||||
|
@ -5,9 +5,8 @@ namespace Phantom.Common.Messages.ToAgent;
|
|||||||
|
|
||||||
[MemoryPackable]
|
[MemoryPackable]
|
||||||
public sealed partial record SendCommandToInstanceMessage(
|
public sealed partial record SendCommandToInstanceMessage(
|
||||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
[property: MemoryPackOrder(0)] Guid InstanceGuid,
|
||||||
[property: MemoryPackOrder(1)] Guid InstanceGuid,
|
[property: MemoryPackOrder(1)] string Command
|
||||||
[property: MemoryPackOrder(2)] string Command
|
|
||||||
) : IMessageToAgent<InstanceActionResult<SendCommandToInstanceResult>> {
|
) : IMessageToAgent<InstanceActionResult<SendCommandToInstanceResult>> {
|
||||||
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToAgentListener listener) {
|
public Task<InstanceActionResult<SendCommandToInstanceResult>> Accept(IMessageToAgentListener listener) {
|
||||||
return listener.HandleSendCommandToInstance(this);
|
return listener.HandleSendCommandToInstance(this);
|
||||||
|
@ -6,9 +6,8 @@ namespace Phantom.Common.Messages.ToAgent;
|
|||||||
|
|
||||||
[MemoryPackable]
|
[MemoryPackable]
|
||||||
public sealed partial record StopInstanceMessage(
|
public sealed partial record StopInstanceMessage(
|
||||||
[property: MemoryPackOrder(0)] uint SequenceId,
|
[property: MemoryPackOrder(0)] Guid InstanceGuid,
|
||||||
[property: MemoryPackOrder(1)] Guid InstanceGuid,
|
[property: MemoryPackOrder(1)] MinecraftStopStrategy StopStrategy
|
||||||
[property: MemoryPackOrder(2)] MinecraftStopStrategy StopStrategy
|
|
||||||
) : IMessageToAgent<InstanceActionResult<StopInstanceResult>> {
|
) : IMessageToAgent<InstanceActionResult<StopInstanceResult>> {
|
||||||
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToAgentListener listener) {
|
public Task<InstanceActionResult<StopInstanceResult>> Accept(IMessageToAgentListener listener) {
|
||||||
return listener.HandleStopInstance(this);
|
return listener.HandleStopInstance(this);
|
||||||
|
@ -34,29 +34,32 @@ public sealed class RpcClientConnection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] WriteBytes<TMessage, TReply>(TMessage message) where TMessage : IMessageToAgent<TReply> {
|
|
||||||
return isClosed ? Array.Empty<byte>() : MessageRegistries.ToAgent.Write<TMessage, TReply>(message).ToArray();
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
|
public async Task Send<TMessage>(TMessage message) where TMessage : IMessageToAgent {
|
||||||
var bytes = WriteBytes<TMessage, NoReply>(message);
|
if (isClosed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var bytes = MessageRegistries.ToAgent.Write(message).ToArray();
|
||||||
if (bytes.Length > 0) {
|
if (bytes.Length > 0) {
|
||||||
await socket.SendAsync(routingId, bytes);
|
await socket.SendAsync(routingId, bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
public async Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||||
var sequenceId = messageReplyTracker.RegisterReply();
|
if (isClosed) {
|
||||||
var message = messageFactory(sequenceId);
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
var bytes = WriteBytes<TMessage, TReply>(message);
|
var sequenceId = messageReplyTracker.RegisterReply();
|
||||||
|
|
||||||
|
var bytes = MessageRegistries.ToAgent.Write<TMessage, TReply>(sequenceId, message).ToArray();
|
||||||
if (bytes.Length == 0) {
|
if (bytes.Length == 0) {
|
||||||
messageReplyTracker.ForgetReply(sequenceId);
|
messageReplyTracker.ForgetReply(sequenceId);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
await socket.SendAsync(routingId, bytes);
|
await socket.SendAsync(routingId, bytes);
|
||||||
return await messageReplyTracker.WaitForReply<TReply>(message.SequenceId, waitForReplyTime, cancellationToken);
|
return await messageReplyTracker.WaitForReply<TReply>(sequenceId, waitForReplyTime, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Receive(ReplyMessage message) {
|
public void Receive(ReplyMessage message) {
|
||||||
|
@ -22,7 +22,7 @@ sealed class AgentConnection {
|
|||||||
return connection.Send(message);
|
return connection.Send(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<TReply?> Send<TMessage, TReply>(Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
public Task<TReply?> Send<TMessage, TReply>(TMessage message, TimeSpan waitForReplyTime, CancellationToken cancellationToken) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||||
return connection.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
|
return connection.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -113,14 +113,14 @@ public sealed class AgentManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, Func<uint, TMessage> messageFactory, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
internal async Task<TReply?> SendMessage<TMessage, TReply>(Guid guid, TMessage message, TimeSpan waitForReplyTime) where TMessage : IMessageToAgent<TReply> where TReply : class {
|
||||||
var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null;
|
var connection = agents.ByGuid.TryGetValue(guid, out var agent) ? agent.Connection : null;
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
// TODO handle missing agent?
|
// TODO handle missing agent?
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return await connection.Send<TMessage, TReply>(messageFactory, waitForReplyTime, cancellationToken);
|
return await connection.Send<TMessage, TReply>(message, waitForReplyTime, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> {
|
private sealed class ObservableAgents : ObservableState<ImmutableArray<Agent>> {
|
||||||
|
@ -3,6 +3,7 @@ using Phantom.Common.Data.Instance;
|
|||||||
using Phantom.Common.Data.Minecraft;
|
using Phantom.Common.Data.Minecraft;
|
||||||
using Phantom.Common.Data.Replies;
|
using Phantom.Common.Data.Replies;
|
||||||
using Phantom.Common.Logging;
|
using Phantom.Common.Logging;
|
||||||
|
using Phantom.Common.Messages;
|
||||||
using Phantom.Common.Messages.ToAgent;
|
using Phantom.Common.Messages.ToAgent;
|
||||||
using Phantom.Common.Minecraft;
|
using Phantom.Common.Minecraft;
|
||||||
using Phantom.Server.Database;
|
using Phantom.Server.Database;
|
||||||
@ -67,7 +68,7 @@ public sealed class InstanceManager {
|
|||||||
|
|
||||||
var agentName = agent.Name;
|
var agentName = agent.Name;
|
||||||
|
|
||||||
var reply = (await agentManager.SendMessage<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(configuration.AgentGuid, sequenceId => new ConfigureInstanceMessage(sequenceId, configuration), TimeSpan.FromSeconds(10))).DidNotReplyIfNull();
|
var reply = (await agentManager.SendMessage<ConfigureInstanceMessage, InstanceActionResult<ConfigureInstanceResult>>(configuration.AgentGuid, new ConfigureInstanceMessage(configuration), TimeSpan.FromSeconds(10))).DidNotReplyIfNull();
|
||||||
if (reply.Is(ConfigureInstanceResult.Success)) {
|
if (reply.Is(ConfigureInstanceResult.Success)) {
|
||||||
using (var scope = databaseProvider.CreateScope()) {
|
using (var scope = databaseProvider.CreateScope()) {
|
||||||
InstanceEntity entity = scope.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
|
InstanceEntity entity = scope.Ctx.InstanceUpsert.Fetch(configuration.InstanceGuid);
|
||||||
@ -119,6 +120,11 @@ public sealed class InstanceManager {
|
|||||||
instances.ByGuid.ReplaceAllIf(instance => instance with { Status = instanceStatus }, instance => instance.Configuration.AgentGuid == agentGuid);
|
instances.ByGuid.ReplaceAllIf(instance => instance with { Status = instanceStatus }, instance => instance.Configuration.AgentGuid == agentGuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async Task<InstanceActionResult<TReply>> SendInstanceActionMessage<TMessage, TReply>(Instance instance, TMessage message) where TMessage : IMessageToAgent<InstanceActionResult<TReply>> {
|
||||||
|
var reply = await agentManager.SendMessage<TMessage, InstanceActionResult<TReply>>(instance.Configuration.AgentGuid, message, TimeSpan.FromSeconds(10));
|
||||||
|
return reply.DidNotReplyIfNull();
|
||||||
|
}
|
||||||
|
|
||||||
public async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid instanceGuid) {
|
public async Task<InstanceActionResult<LaunchInstanceResult>> LaunchInstance(Guid instanceGuid) {
|
||||||
var instance = GetInstance(instanceGuid);
|
var instance = GetInstance(instanceGuid);
|
||||||
if (instance == null) {
|
if (instance == null) {
|
||||||
@ -127,8 +133,7 @@ public sealed class InstanceManager {
|
|||||||
|
|
||||||
await SetInstanceShouldLaunchAutomatically(instanceGuid, true);
|
await SetInstanceShouldLaunchAutomatically(instanceGuid, true);
|
||||||
|
|
||||||
var reply = await agentManager.SendMessage<LaunchInstanceMessage, InstanceActionResult<LaunchInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new LaunchInstanceMessage(sequenceId, instanceGuid), TimeSpan.FromSeconds(10));
|
return await SendInstanceActionMessage<LaunchInstanceMessage, LaunchInstanceResult>(instance, new LaunchInstanceMessage(instanceGuid));
|
||||||
return reply.DidNotReplyIfNull();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
|
public async Task<InstanceActionResult<StopInstanceResult>> StopInstance(Guid instanceGuid, MinecraftStopStrategy stopStrategy) {
|
||||||
@ -139,8 +144,7 @@ public sealed class InstanceManager {
|
|||||||
|
|
||||||
await SetInstanceShouldLaunchAutomatically(instanceGuid, false);
|
await SetInstanceShouldLaunchAutomatically(instanceGuid, false);
|
||||||
|
|
||||||
var reply = await agentManager.SendMessage<StopInstanceMessage, InstanceActionResult<StopInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new StopInstanceMessage(sequenceId, instanceGuid, stopStrategy), TimeSpan.FromSeconds(10));
|
return await SendInstanceActionMessage<StopInstanceMessage, StopInstanceResult>(instance, new StopInstanceMessage(instanceGuid, stopStrategy));
|
||||||
return reply.DidNotReplyIfNull();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task SetInstanceShouldLaunchAutomatically(Guid instanceGuid, bool shouldLaunchAutomatically) {
|
private async Task SetInstanceShouldLaunchAutomatically(Guid instanceGuid, bool shouldLaunchAutomatically) {
|
||||||
@ -162,8 +166,7 @@ public sealed class InstanceManager {
|
|||||||
return InstanceActionResult.General<SendCommandToInstanceResult>(InstanceActionGeneralResult.InstanceDoesNotExist);
|
return InstanceActionResult.General<SendCommandToInstanceResult>(InstanceActionGeneralResult.InstanceDoesNotExist);
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply = await agentManager.SendMessage<SendCommandToInstanceMessage, InstanceActionResult<SendCommandToInstanceResult>>(instance.Configuration.AgentGuid, sequenceId => new SendCommandToInstanceMessage(sequenceId, instanceGuid, command), TimeSpan.FromSeconds(10));
|
return await SendInstanceActionMessage<SendCommandToInstanceMessage, SendCommandToInstanceResult>(instance, new SendCommandToInstanceMessage(instanceGuid, command));
|
||||||
return reply.DidNotReplyIfNull();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
internal ImmutableArray<InstanceConfiguration> GetInstanceConfigurationsForAgent(Guid agentGuid) {
|
internal ImmutableArray<InstanceConfiguration> GetInstanceConfigurationsForAgent(Guid agentGuid) {
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
namespace Phantom.Utils.Rpc.Message;
|
namespace Phantom.Utils.Rpc.Message;
|
||||||
|
|
||||||
public interface IMessage<TListener, TReply> {
|
public interface IMessage<TListener, TReply> {
|
||||||
public uint SequenceId { get; }
|
|
||||||
Task<TReply> Accept(TListener listener);
|
Task<TReply> Accept(TListener listener);
|
||||||
}
|
}
|
||||||
|
@ -17,22 +17,22 @@ public abstract class MessageHandler<TListener> {
|
|||||||
this.cancellationToken = cancellationToken;
|
this.cancellationToken = cancellationToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
internal void Enqueue<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
|
internal void Enqueue<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
||||||
cancellationToken.ThrowIfCancellationRequested();
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
taskManager.Run(async () => {
|
taskManager.Run(async () => {
|
||||||
try {
|
try {
|
||||||
await Handle<TMessage, TReply>(message);
|
await Handle<TMessage, TReply>(sequenceId, message);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
|
logger.Error(e, "Failed to handle message {Type}.", message.GetType().Name);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task Handle<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
|
private async Task Handle<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
||||||
TReply reply = await message.Accept(Listener);
|
TReply reply = await message.Accept(Listener);
|
||||||
|
|
||||||
if (reply is not NoReply) {
|
if (reply is not NoReply) {
|
||||||
await SendReply(message.SequenceId, MessageSerializer.Serialize(reply));
|
await SendReply(sequenceId, MessageSerializer.Serialize(reply));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,10 +17,23 @@ public sealed class MessageRegistry<TListener> {
|
|||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void Add<TMessage>(ushort code) where TMessage : IMessage<TListener, NoReply> {
|
||||||
|
AddTypeCodeMapping<TMessage, NoReply>(code);
|
||||||
|
codeToHandlerMapping.Add(code, DeserializationHandler<TMessage>);
|
||||||
|
}
|
||||||
|
|
||||||
public void Add<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
|
public void Add<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
|
||||||
|
if (typeof(TReply) == typeof(NoReply)) {
|
||||||
|
throw new InvalidOperationException("This overload of Add must not be used with NoReply as the reply type!");
|
||||||
|
}
|
||||||
|
|
||||||
|
AddTypeCodeMapping<TMessage, TReply>(code);
|
||||||
|
codeToHandlerMapping.Add(code, DeserializationHandler<TMessage, TReply>);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void AddTypeCodeMapping<TMessage, TReply>(ushort code) where TMessage : IMessage<TListener, TReply> {
|
||||||
typeToCodeMapping.Add(typeof(TMessage), code);
|
typeToCodeMapping.Add(typeof(TMessage), code);
|
||||||
codeToTypeMapping.Add(code, typeof(TMessage));
|
codeToTypeMapping.Add(code, typeof(TMessage));
|
||||||
codeToHandlerMapping.Add(code, HandleInternal<TMessage, TReply>);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
|
public bool TryGetType(ReadOnlyMemory<byte> data, [NotNullWhen(true)] out Type? type) {
|
||||||
@ -34,10 +47,10 @@ public sealed class MessageRegistry<TListener> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
|
public ReadOnlySpan<byte> Write<TMessage>(TMessage message) where TMessage : IMessage<TListener, NoReply> {
|
||||||
return Write<TMessage, NoReply>(message);
|
return Write<TMessage, NoReply>(0, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ReadOnlySpan<byte> Write<TMessage, TReply>(TMessage message) where TMessage : IMessage<TListener, TReply> {
|
public ReadOnlySpan<byte> Write<TMessage, TReply>(uint sequenceId, TMessage message) where TMessage : IMessage<TListener, TReply> {
|
||||||
if (!typeToCodeMapping.TryGetValue(typeof(TMessage), out ushort code)) {
|
if (!typeToCodeMapping.TryGetValue(typeof(TMessage), out ushort code)) {
|
||||||
logger.Error("Unknown message type {Type}.", typeof(TMessage));
|
logger.Error("Unknown message type {Type}.", typeof(TMessage));
|
||||||
return default;
|
return default;
|
||||||
@ -47,6 +60,11 @@ public sealed class MessageRegistry<TListener> {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
MessageSerializer.WriteCode(buffer, code);
|
MessageSerializer.WriteCode(buffer, code);
|
||||||
|
|
||||||
|
if (typeof(TReply) != typeof(NoReply)) {
|
||||||
|
MessageSerializer.WriteSequenceId(buffer, sequenceId);
|
||||||
|
}
|
||||||
|
|
||||||
MessageSerializer.Serialize(buffer, message);
|
MessageSerializer.Serialize(buffer, message);
|
||||||
|
|
||||||
if (buffer.WrittenCount > DefaultBufferSize && logger.IsEnabled(LogEventLevel.Verbose)) {
|
if (buffer.WrittenCount > DefaultBufferSize && logger.IsEnabled(LogEventLevel.Verbose)) {
|
||||||
@ -77,7 +95,23 @@ public sealed class MessageRegistry<TListener> {
|
|||||||
handle(data, code, handler);
|
handle(data, code, handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleInternal<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, TReply> {
|
private void DeserializationHandler<TMessage>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, NoReply> {
|
||||||
|
DeserializeAndEnqueueMessage<TMessage, NoReply>(data, code, handler, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void DeserializationHandler<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler) where TMessage : IMessage<TListener, TReply> {
|
||||||
|
uint sequenceId;
|
||||||
|
try {
|
||||||
|
sequenceId = MessageSerializer.ReadSequenceId(ref data);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.Error(e, "Failed to deserialize sequence ID of message with code {Code}.", code);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
DeserializeAndEnqueueMessage<TMessage, TReply>(data, code, handler, sequenceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void DeserializeAndEnqueueMessage<TMessage, TReply>(ReadOnlyMemory<byte> data, ushort code, MessageHandler<TListener> handler, uint sequenceId) where TMessage : IMessage<TListener, TReply> {
|
||||||
TMessage message;
|
TMessage message;
|
||||||
try {
|
try {
|
||||||
message = MessageSerializer.Deserialize<TMessage>(data);
|
message = MessageSerializer.Deserialize<TMessage>(data);
|
||||||
@ -86,6 +120,6 @@ public sealed class MessageRegistry<TListener> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.Enqueue<TMessage, TReply>(message);
|
handler.Enqueue<TMessage, TReply>(sequenceId, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,4 +30,16 @@ static class MessageSerializer {
|
|||||||
memory = memory[2..];
|
memory = memory[2..];
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void WriteSequenceId(IBufferWriter<byte> destination, uint sequenceId) {
|
||||||
|
Span<byte> buffer = stackalloc byte[4];
|
||||||
|
BinaryPrimitives.WriteUInt32LittleEndian(buffer, sequenceId);
|
||||||
|
destination.Write(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static uint ReadSequenceId(ref ReadOnlyMemory<byte> memory) {
|
||||||
|
uint value = BinaryPrimitives.ReadUInt32LittleEndian(memory.Span);
|
||||||
|
memory = memory[4..];
|
||||||
|
return value;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user