1
0
mirror of https://github.com/chylex/Discord-History-Tracker.git synced 2024-10-19 05:42:50 +02:00

Compare commits

..

2 Commits

Author SHA1 Message Date
031d521402
Convert attachment download events to reactive 2023-12-28 17:33:30 +01:00
0131f8cb50
Refactor integrated server management 2023-12-28 17:33:30 +01:00
6 changed files with 42 additions and 34 deletions

View File

@ -21,6 +21,7 @@
<PackageReference Include="Avalonia.Desktop" Version="11.0.6" /> <PackageReference Include="Avalonia.Desktop" Version="11.0.6" />
<PackageReference Include="Avalonia.Diagnostics" Version="11.0.6" Condition=" '$(Configuration)' == 'Debug' " /> <PackageReference Include="Avalonia.Diagnostics" Version="11.0.6" Condition=" '$(Configuration)' == 'Debug' " />
<PackageReference Include="Avalonia.Fonts.Inter" Version="11.0.6" /> <PackageReference Include="Avalonia.Fonts.Inter" Version="11.0.6" />
<PackageReference Include="Avalonia.ReactiveUI" Version="11.0.6" />
<PackageReference Include="Avalonia.Themes.Fluent" Version="11.0.6" /> <PackageReference Include="Avalonia.Themes.Fluent" Version="11.0.6" />
</ItemGroup> </ItemGroup>

View File

@ -1,9 +1,9 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.ComponentModel; using System.ComponentModel;
using System.Threading; using System.Reactive.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Avalonia.Threading; using Avalonia.ReactiveUI;
using DHT.Desktop.Common; using DHT.Desktop.Common;
using DHT.Desktop.Main.Controls; using DHT.Desktop.Main.Controls;
using DHT.Server; using DHT.Server;
@ -11,7 +11,6 @@ using DHT.Server.Data;
using DHT.Server.Data.Aggregations; using DHT.Server.Data.Aggregations;
using DHT.Server.Data.Filters; using DHT.Server.Data.Filters;
using DHT.Server.Database; using DHT.Server.Database;
using DHT.Server.Download;
using DHT.Utils.Models; using DHT.Utils.Models;
using DHT.Utils.Tasks; using DHT.Utils.Tasks;
@ -61,6 +60,7 @@ sealed class AttachmentsPageModel : BaseModel, IDisposable {
private readonly State state; private readonly State state;
private readonly AsyncValueComputer<DownloadStatusStatistics>.Single downloadStatisticsComputer; private readonly AsyncValueComputer<DownloadStatusStatistics>.Single downloadStatisticsComputer;
private IDisposable? finishedItemsSubscription;
private int doneItemsCount; private int doneItemsCount;
private int initialFinishedCount; private int initialFinishedCount;
private int? totalItemsToDownloadCount; private int? totalItemsToDownloadCount;
@ -76,12 +76,11 @@ sealed class AttachmentsPageModel : BaseModel, IDisposable {
downloadStatisticsComputer.Recompute(); downloadStatisticsComputer.Recompute();
state.Db.Statistics.PropertyChanged += OnDbStatisticsChanged; state.Db.Statistics.PropertyChanged += OnDbStatisticsChanged;
state.Downloader.OnItemFinished += DownloaderOnOnItemFinished;
} }
public void Dispose() { public void Dispose() {
state.Db.Statistics.PropertyChanged -= OnDbStatisticsChanged; state.Db.Statistics.PropertyChanged -= OnDbStatisticsChanged;
state.Downloader.OnItemFinished -= DownloaderOnOnItemFinished; finishedItemsSubscription?.Dispose();
FilterModel.Dispose(); FilterModel.Dispose();
} }
@ -139,19 +138,22 @@ sealed class AttachmentsPageModel : BaseModel, IDisposable {
OnPropertyChanged(nameof(DownloadProgress)); OnPropertyChanged(nameof(DownloadProgress));
} }
private void DownloaderOnOnItemFinished(object? sender, DownloadItem e) { private void OnItemsFinished(int finishedItemCount) {
Interlocked.Increment(ref doneItemsCount); doneItemsCount += finishedItemCount;
UpdateDownloadMessage();
Dispatcher.UIThread.Invoke(UpdateDownloadMessage);
downloadStatisticsComputer.Recompute(); downloadStatisticsComputer.Recompute();
} }
public async Task OnClickToggleDownload() { public async Task OnClickToggleDownload() {
if (IsDownloading) {
IsToggleDownloadButtonEnabled = false; IsToggleDownloadButtonEnabled = false;
if (IsDownloading) {
await state.Downloader.Stop(); await state.Downloader.Stop();
finishedItemsSubscription?.Dispose();
finishedItemsSubscription = null;
downloadStatisticsComputer.Recompute(); downloadStatisticsComputer.Recompute();
IsToggleDownloadButtonEnabled = true;
state.Db.RemoveDownloadItems(EnqueuedItemFilter, FilterRemovalMode.RemoveMatching); state.Db.RemoveDownloadItems(EnqueuedItemFilter, FilterRemovalMode.RemoveMatching);
@ -161,13 +163,22 @@ sealed class AttachmentsPageModel : BaseModel, IDisposable {
UpdateDownloadMessage(); UpdateDownloadMessage();
} }
else { else {
var finishedItems = await state.Downloader.Start();
initialFinishedCount = statisticsDownloaded.Items + statisticsFailed.Items; initialFinishedCount = statisticsDownloaded.Items + statisticsFailed.Items;
await state.Downloader.Start(); finishedItemsSubscription = finishedItems.Select(static _ => true)
.Buffer(TimeSpan.FromMilliseconds(100))
.Select(static items => items.Count)
.Where(static items => items > 0)
.ObserveOn(AvaloniaScheduler.Instance)
.Subscribe(OnItemsFinished);
EnqueueDownloadItems(); EnqueueDownloadItems();
} }
OnPropertyChanged(nameof(ToggleDownloadButtonText)); OnPropertyChanged(nameof(ToggleDownloadButtonText));
OnPropertyChanged(nameof(IsDownloading)); OnPropertyChanged(nameof(IsDownloading));
IsToggleDownloadButtonEnabled = true;
} }
public void OnClickRetryFailedDownloads() { public void OnClickRetryFailedDownloads() {

View File

@ -9,8 +9,6 @@ public sealed class Downloader {
private DownloaderTask? current; private DownloaderTask? current;
public bool IsDownloading => current != null; public bool IsDownloading => current != null;
public event EventHandler<DownloadItem>? OnItemFinished;
private readonly IDatabaseFile db; private readonly IDatabaseFile db;
private readonly SemaphoreSlim semaphore = new (1, 1); private readonly SemaphoreSlim semaphore = new (1, 1);
@ -18,13 +16,11 @@ public sealed class Downloader {
this.db = db; this.db = db;
} }
public async Task Start() { public async Task<IObservable<DownloadItem>> Start() {
await semaphore.WaitAsync(); await semaphore.WaitAsync();
try { try {
if (current == null) { current ??= new DownloaderTask(db);
current = new DownloaderTask(db); return current.FinishedItems;
current.OnItemFinished += DelegateOnItemFinished;
}
} finally { } finally {
semaphore.Release(); semaphore.Release();
} }
@ -34,16 +30,11 @@ public sealed class Downloader {
await semaphore.WaitAsync(); await semaphore.WaitAsync();
try { try {
if (current != null) { if (current != null) {
await current.Stop(); await current.DisposeAsync();
current.OnItemFinished -= DelegateOnItemFinished;
current = null; current = null;
} }
} finally { } finally {
semaphore.Release(); semaphore.Release();
} }
} }
private void DelegateOnItemFinished(object? sender, DownloadItem e) {
OnItemFinished?.Invoke(this, e);
}
} }

View File

@ -1,25 +1,23 @@
using System; using System;
using System.Linq; using System.Linq;
using System.Net.Http; using System.Net.Http;
using System.Reactive.Subjects;
using System.Threading; using System.Threading;
using System.Threading.Channels; using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using DHT.Server.Database; using DHT.Server.Database;
using DHT.Utils.Logging; using DHT.Utils.Logging;
using DHT.Utils.Models;
using DHT.Utils.Tasks; using DHT.Utils.Tasks;
namespace DHT.Server.Download; namespace DHT.Server.Download;
sealed class DownloaderTask : BaseModel { sealed class DownloaderTask : IAsyncDisposable {
private static readonly Log Log = Log.ForType<DownloaderTask>(); private static readonly Log Log = Log.ForType<DownloaderTask>();
private const int DownloadTasks = 4; private const int DownloadTasks = 4;
private const int QueueSize = 25; private const int QueueSize = 25;
private const string UserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"; private const string UserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36";
internal event EventHandler<DownloadItem>? OnItemFinished;
private readonly Channel<DownloadItem> downloadQueue = Channel.CreateBounded<DownloadItem>(new BoundedChannelOptions(QueueSize) { private readonly Channel<DownloadItem> downloadQueue = Channel.CreateBounded<DownloadItem>(new BoundedChannelOptions(QueueSize) {
SingleReader = false, SingleReader = false,
SingleWriter = true, SingleWriter = true,
@ -31,12 +29,16 @@ sealed class DownloaderTask : BaseModel {
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
private readonly IDatabaseFile db; private readonly IDatabaseFile db;
private readonly Subject<DownloadItem> finishedItemPublisher = new ();
private readonly Task queueWriterTask; private readonly Task queueWriterTask;
private readonly Task[] downloadTasks; private readonly Task[] downloadTasks;
public IObservable<DownloadItem> FinishedItems => finishedItemPublisher;
internal DownloaderTask(IDatabaseFile db) { internal DownloaderTask(IDatabaseFile db) {
this.cancellationToken = cancellationTokenSource.Token;
this.db = db; this.db = db;
this.cancellationToken = cancellationTokenSource.Token;
this.queueWriterTask = Task.Run(RunQueueWriterTask); this.queueWriterTask = Task.Run(RunQueueWriterTask);
this.downloadTasks = Enumerable.Range(1, DownloadTasks).Select(taskIndex => Task.Run(() => RunDownloadTask(taskIndex))).ToArray(); this.downloadTasks = Enumerable.Range(1, DownloadTasks).Select(taskIndex => Task.Run(() => RunDownloadTask(taskIndex))).ToArray();
} }
@ -79,7 +81,7 @@ sealed class DownloaderTask : BaseModel {
log.Error(e); log.Error(e);
} finally { } finally {
try { try {
OnItemFinished?.Invoke(this, item); finishedItemPublisher.OnNext(item);
} catch (Exception e) { } catch (Exception e) {
log.Error("Caught exception in event handler: " + e); log.Error("Caught exception in event handler: " + e);
} }
@ -87,7 +89,7 @@ sealed class DownloaderTask : BaseModel {
} }
} }
internal async Task Stop() { public async ValueTask DisposeAsync() {
try { try {
await cancellationTokenSource.CancelAsync(); await cancellationTokenSource.CancelAsync();
} catch (Exception) { } catch (Exception) {
@ -102,6 +104,7 @@ sealed class DownloaderTask : BaseModel {
await Task.WhenAll(downloadTasks).WaitIgnoringCancellation(); await Task.WhenAll(downloadTasks).WaitIgnoringCancellation();
} finally { } finally {
cancellationTokenSource.Dispose(); cancellationTokenSource.Dispose();
finishedItemPublisher.OnCompleted();
} }
} }
} }

View File

@ -12,6 +12,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Data.Sqlite" Version="8.0.0" /> <PackageReference Include="Microsoft.Data.Sqlite" Version="8.0.0" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -21,6 +21,7 @@ public sealed class State : IAsyncDisposable {
public async ValueTask DisposeAsync() { public async ValueTask DisposeAsync() {
await Downloader.Stop(); await Downloader.Stop();
await Server.Stop();
Db.Dispose(); Db.Dispose();
} }
} }