1
0
mirror of https://github.com/chylex/Discord-History-Tracker.git synced 2024-11-25 14:42:44 +01:00

Compare commits

..

No commits in common. "7173dc6cfc5ad0dbd4e8c9dec933f673bfdab89d" and "4929a19397e8893b49ee2024a481c5fa2f973546" have entirely different histories.

52 changed files with 733 additions and 1016 deletions

View File

@ -9,15 +9,15 @@
<entry key="Desktop/Dialogs/Progress/ProgressDialog.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Dialogs/Progress/ProgressDialog.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Dialogs/TextBox/TextBoxDialog.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Dialogs/TextBox/TextBoxDialog.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/AboutWindow.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/AboutWindow.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Controls/DownloadItemFilterPanel.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/Controls/AttachmentFilterPanel.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Controls/MessageFilterPanel.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/Controls/MessageFilterPanel.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Controls/ServerConfigurationPanel.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/Controls/ServerConfigurationPanel.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Controls/StatusBar.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/Controls/StatusBar.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/MainWindow.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/MainWindow.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Pages/AdvancedPage.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/Pages/AdvancedPage.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Pages/AttachmentsPage.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Pages/DatabasePage.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/Pages/DatabasePage.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Pages/DebugPage.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/Pages/DebugPage.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Pages/DownloadsPage.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Pages/TrackingPage.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/Pages/TrackingPage.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Pages/ViewerPage.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/Pages/ViewerPage.axaml" value="Desktop/Desktop.csproj" />
<entry key="Desktop/Main/Screens/MainContentScreen.axaml" value="Desktop/Desktop.csproj" /> <entry key="Desktop/Main/Screens/MainContentScreen.axaml" value="Desktop/Desktop.csproj" />
@ -25,4 +25,4 @@
</map> </map>
</option> </option>
</component> </component>
</project> </project>

View File

@ -29,7 +29,7 @@ sealed class BytesValueConverter : IValueConverter {
private const int Scale = 1000; private const int Scale = 1000;
public static string Convert(ulong size) { private static string Convert(ulong size) {
int power = size == 0L ? 0 : (int) Math.Log(size, Scale); int power = size == 0L ? 0 : (int) Math.Log(size, Scale);
int unit = power >= Units.Length ? Units.Length - 1 : power; int unit = power >= Units.Length ? Units.Length - 1 : power;
return Units[unit].Format(unit == 0 ? size : size / Math.Pow(Scale, unit)); return Units[unit].Format(unit == 0 ? size : size / Math.Pow(Scale, unit));

View File

@ -32,6 +32,10 @@
<ItemGroup> <ItemGroup>
<Compile Include="..\Version.cs" Link="Version.cs" /> <Compile Include="..\Version.cs" Link="Version.cs" />
<Compile Update="Dialogs\TextBox\TextBoxDialog.axaml.cs">
<DependentUpon>CheckBoxDialog.axaml</DependentUpon>
<SubType>Code</SubType>
</Compile>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -4,11 +4,11 @@
xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
xmlns:controls="clr-namespace:DHT.Desktop.Main.Controls" xmlns:controls="clr-namespace:DHT.Desktop.Main.Controls"
mc:Ignorable="d" mc:Ignorable="d"
x:Class="DHT.Desktop.Main.Controls.DownloadItemFilterPanel" x:Class="DHT.Desktop.Main.Controls.AttachmentFilterPanel"
x:DataType="controls:DownloadItemFilterPanelModel"> x:DataType="controls:AttachmentFilterPanelModel">
<Design.DataContext> <Design.DataContext>
<controls:DownloadItemFilterPanelModel /> <controls:AttachmentFilterPanelModel />
</Design.DataContext> </Design.DataContext>
<UserControl.Styles> <UserControl.Styles>

View File

@ -4,8 +4,8 @@ using Avalonia.Controls;
namespace DHT.Desktop.Main.Controls; namespace DHT.Desktop.Main.Controls;
[SuppressMessage("ReSharper", "MemberCanBeInternal")] [SuppressMessage("ReSharper", "MemberCanBeInternal")]
public sealed partial class DownloadItemFilterPanel : UserControl { public sealed partial class AttachmentFilterPanel : UserControl {
public DownloadItemFilterPanel() { public AttachmentFilterPanel() {
InitializeComponent(); InitializeComponent();
} }
} }

View File

@ -12,7 +12,7 @@ using DHT.Utils.Tasks;
namespace DHT.Desktop.Main.Controls; namespace DHT.Desktop.Main.Controls;
sealed partial class DownloadItemFilterPanelModel : ObservableObject, IDisposable { sealed partial class AttachmentFilterPanelModel : ObservableObject, IDisposable {
public sealed record Unit(string Name, uint Scale); public sealed record Unit(string Name, uint Scale);
private static readonly Unit[] AllUnits = [ private static readonly Unit[] AllUnits = [
@ -43,21 +43,21 @@ sealed partial class DownloadItemFilterPanelModel : ObservableObject, IDisposabl
private readonly State state; private readonly State state;
private readonly string verb; private readonly string verb;
private readonly RestartableTask<long> downloadItemCountTask; private readonly RestartableTask<long> matchingAttachmentCountTask;
private long? matchingItemCount; private long? matchingAttachmentCount;
private readonly IDisposable downloadItemCountSubscription; private readonly IDisposable attachmentCountSubscription;
private long? totalItemCount; private long? totalAttachmentCount;
[Obsolete("Designer")] [Obsolete("Designer")]
public DownloadItemFilterPanelModel() : this(State.Dummy) {} public AttachmentFilterPanelModel() : this(State.Dummy) {}
public DownloadItemFilterPanelModel(State state, string verb = "Matches") { public AttachmentFilterPanelModel(State state, string verb = "Matches") {
this.state = state; this.state = state;
this.verb = verb; this.verb = verb;
this.downloadItemCountTask = new RestartableTask<long>(SetMatchingCount, TaskScheduler.FromCurrentSynchronizationContext()); this.matchingAttachmentCountTask = new RestartableTask<long>(SetAttachmentCounts, TaskScheduler.FromCurrentSynchronizationContext());
this.downloadItemCountSubscription = state.Db.Downloads.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnDownloadItemCountChanged); this.attachmentCountSubscription = state.Db.Attachments.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnAttachmentCountChanged);
UpdateFilterStatistics(); UpdateFilterStatistics();
@ -65,8 +65,7 @@ sealed partial class DownloadItemFilterPanelModel : ObservableObject, IDisposabl
} }
public void Dispose() { public void Dispose() {
downloadItemCountTask.Cancel(); attachmentCountSubscription.Dispose();
downloadItemCountSubscription.Dispose();
} }
private void OnPropertyChanged(object? sender, PropertyChangedEventArgs e) { private void OnPropertyChanged(object? sender, PropertyChangedEventArgs e) {
@ -75,8 +74,8 @@ sealed partial class DownloadItemFilterPanelModel : ObservableObject, IDisposabl
} }
} }
private void OnDownloadItemCountChanged(long newItemCount) { private void OnAttachmentCountChanged(long newAttachmentCount) {
totalItemCount = newItemCount; totalAttachmentCount = newAttachmentCount;
UpdateFilterStatistics(); UpdateFilterStatistics();
} }
@ -84,32 +83,32 @@ sealed partial class DownloadItemFilterPanelModel : ObservableObject, IDisposabl
private void UpdateFilterStatistics() { private void UpdateFilterStatistics() {
var filter = CreateFilter(); var filter = CreateFilter();
if (filter.IsEmpty) { if (filter.IsEmpty) {
downloadItemCountTask.Cancel(); matchingAttachmentCountTask.Cancel();
matchingItemCount = totalItemCount; matchingAttachmentCount = totalAttachmentCount;
UpdateFilterStatisticsText(); UpdateFilterStatisticsText();
} }
else { else {
matchingItemCount = null; matchingAttachmentCount = null;
UpdateFilterStatisticsText(); UpdateFilterStatisticsText();
downloadItemCountTask.Restart(cancellationToken => state.Db.Downloads.Count(filter, cancellationToken)); matchingAttachmentCountTask.Restart(cancellationToken => state.Db.Attachments.Count(filter, cancellationToken));
} }
} }
private void SetMatchingCount(long matchingAttachmentCount) { private void SetAttachmentCounts(long matchingAttachmentCount) {
this.matchingItemCount = matchingAttachmentCount; this.matchingAttachmentCount = matchingAttachmentCount;
UpdateFilterStatisticsText(); UpdateFilterStatisticsText();
} }
private void UpdateFilterStatisticsText() { private void UpdateFilterStatisticsText() {
var matchingItemCountStr = matchingItemCount?.Format() ?? "(...)"; var matchingAttachmentCountStr = matchingAttachmentCount?.Format() ?? "(...)";
var totalItemCountStr = totalItemCount?.Format() ?? "(...)"; var totalAttachmentCountStr = totalAttachmentCount?.Format() ?? "(...)";
FilterStatisticsText = verb + " " + matchingItemCountStr + " out of " + totalItemCountStr + " file" + (totalItemCount is null or 1 ? "." : "s."); FilterStatisticsText = verb + " " + matchingAttachmentCountStr + " out of " + totalAttachmentCountStr + " attachment" + (totalAttachmentCount is null or 1 ? "." : "s.");
OnPropertyChanged(nameof(FilterStatisticsText)); OnPropertyChanged(nameof(FilterStatisticsText));
} }
public DownloadItemFilter CreateFilter() { public AttachmentFilter CreateFilter() {
DownloadItemFilter filter = new (); AttachmentFilter filter = new ();
if (LimitSize) { if (LimitSize) {
try { try {

View File

@ -5,11 +5,11 @@
xmlns:pages="clr-namespace:DHT.Desktop.Main.Pages" xmlns:pages="clr-namespace:DHT.Desktop.Main.Pages"
xmlns:controls="clr-namespace:DHT.Desktop.Main.Controls" xmlns:controls="clr-namespace:DHT.Desktop.Main.Controls"
mc:Ignorable="d" d:DesignWidth="800" d:DesignHeight="450" mc:Ignorable="d" d:DesignWidth="800" d:DesignHeight="450"
x:Class="DHT.Desktop.Main.Pages.DownloadsPage" x:Class="DHT.Desktop.Main.Pages.AttachmentsPage"
x:DataType="pages:DownloadsPageModel"> x:DataType="pages:AttachmentsPageModel">
<Design.DataContext> <Design.DataContext>
<pages:DownloadsPageModel /> <pages:AttachmentsPageModel />
</Design.DataContext> </Design.DataContext>
<UserControl.Styles> <UserControl.Styles>
@ -31,15 +31,19 @@
</UserControl.Styles> </UserControl.Styles>
<StackPanel Orientation="Vertical" Spacing="20"> <StackPanel Orientation="Vertical" Spacing="20">
<Button Command="{Binding OnClickToggleDownload}" Content="{Binding ToggleDownloadButtonText}" IsEnabled="{Binding IsToggleDownloadButtonEnabled}" /> <DockPanel>
<controls:DownloadItemFilterPanel DataContext="{Binding FilterModel}" IsEnabled="{Binding !$parent[UserControl].((pages:DownloadsPageModel)DataContext).IsDownloading}" /> <Button Command="{Binding OnClickToggleDownload}" Content="{Binding ToggleDownloadButtonText}" IsEnabled="{Binding IsToggleDownloadButtonEnabled}" DockPanel.Dock="Left" />
<TextBlock Text="{Binding DownloadMessage}" MinWidth="100" Margin="10 0 0 0" VerticalAlignment="Center" TextAlignment="Right" DockPanel.Dock="Left" />
<ProgressBar Value="{Binding DownloadProgress}" IsVisible="{Binding IsDownloading}" Margin="15 0" VerticalAlignment="Center" DockPanel.Dock="Right" />
</DockPanel>
<controls:AttachmentFilterPanel DataContext="{Binding FilterModel}" IsEnabled="{Binding !IsDownloading, RelativeSource={RelativeSource AncestorType=pages:AttachmentsPageModel}}" />
<StackPanel Orientation="Vertical" Spacing="12"> <StackPanel Orientation="Vertical" Spacing="12">
<Expander Header="Download Status" IsExpanded="True"> <Expander Header="Download Status" IsExpanded="True">
<DataGrid ItemsSource="{Binding StatisticsRows}" AutoGenerateColumns="False" CanUserReorderColumns="False" CanUserResizeColumns="False" CanUserSortColumns="False" IsReadOnly="True"> <DataGrid ItemsSource="{Binding StatisticsRows}" AutoGenerateColumns="False" CanUserReorderColumns="False" CanUserResizeColumns="False" CanUserSortColumns="False" IsReadOnly="True">
<DataGrid.Columns> <DataGrid.Columns>
<DataGridTextColumn Header="State" Binding="{Binding State, Mode=OneWay}" Width="*" /> <DataGridTextColumn Header="State" Binding="{Binding State}" Width="*" />
<DataGridTextColumn Header="Files" Binding="{Binding Items, Mode=OneWay, Converter={StaticResource NumberValueConverter}}" Width="*" CellStyleClasses="right" /> <DataGridTextColumn Header="Attachments" Binding="{Binding Items, Mode=OneWay, Converter={StaticResource NumberValueConverter}}" Width="*" CellStyleClasses="right" />
<DataGridTextColumn Header="Size" Binding="{Binding SizeText, Mode=OneWay}" Width="*" CellStyleClasses="right" /> <DataGridTextColumn Header="Size" Binding="{Binding Size, Mode=OneWay, Converter={StaticResource BytesValueConverter}}" Width="*" CellStyleClasses="right" />
</DataGrid.Columns> </DataGrid.Columns>
</DataGrid> </DataGrid>
</Expander> </Expander>

View File

@ -4,8 +4,8 @@ using Avalonia.Controls;
namespace DHT.Desktop.Main.Pages; namespace DHT.Desktop.Main.Pages;
[SuppressMessage("ReSharper", "MemberCanBeInternal")] [SuppressMessage("ReSharper", "MemberCanBeInternal")]
public sealed partial class DownloadsPage : UserControl { public sealed partial class AttachmentsPage : UserControl {
public DownloadsPage() { public AttachmentsPage() {
InitializeComponent(); InitializeComponent();
} }
} }

View File

@ -0,0 +1,253 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Avalonia.Controls;
using Avalonia.ReactiveUI;
using CommunityToolkit.Mvvm.ComponentModel;
using DHT.Desktop.Common;
using DHT.Desktop.Dialogs.Message;
using DHT.Desktop.Main.Controls;
using DHT.Server;
using DHT.Server.Data;
using DHT.Server.Data.Aggregations;
using DHT.Server.Data.Filters;
using DHT.Utils.Logging;
using DHT.Utils.Tasks;
namespace DHT.Desktop.Main.Pages;
sealed partial class AttachmentsPageModel : ObservableObject, IDisposable {
private static readonly Log Log = Log.ForType<AttachmentsPageModel>();
private static readonly DownloadItemFilter EnqueuedItemFilter = new () {
IncludeStatuses = new HashSet<DownloadStatus> {
DownloadStatus.Enqueued,
DownloadStatus.Downloading
}
};
[ObservableProperty(Setter = Access.Private)]
private bool isToggleDownloadButtonEnabled = true;
public string ToggleDownloadButtonText => IsDownloading ? "Stop Downloading" : "Start Downloading";
[ObservableProperty(Setter = Access.Private)]
[NotifyPropertyChangedFor(nameof(IsRetryFailedOnDownloadsButtonEnabled))]
private bool isRetryingFailedDownloads = false;
[ObservableProperty(Setter = Access.Private)]
[NotifyPropertyChangedFor(nameof(IsRetryFailedOnDownloadsButtonEnabled))]
private bool hasFailedDownloads;
public bool IsRetryFailedOnDownloadsButtonEnabled => !IsRetryingFailedDownloads && HasFailedDownloads;
[ObservableProperty(Setter = Access.Private)]
private string downloadMessage = "";
public double DownloadProgress => totalItemsToDownloadCount is null or 0 ? 0.0 : 100.0 * doneItemsCount / totalItemsToDownloadCount.Value;
public AttachmentFilterPanelModel FilterModel { get; }
private readonly StatisticsRow statisticsEnqueued = new ("Enqueued");
private readonly StatisticsRow statisticsDownloaded = new ("Downloaded");
private readonly StatisticsRow statisticsFailed = new ("Failed");
private readonly StatisticsRow statisticsSkipped = new ("Skipped");
public ObservableCollection<StatisticsRow> StatisticsRows { get; }
public bool IsDownloading => state.Downloader.IsDownloading;
private readonly Window window;
private readonly State state;
private readonly ThrottledTask<int> enqueueDownloadItemsTask;
private readonly ThrottledTask<DownloadStatusStatistics> downloadStatisticsTask;
private readonly IDisposable attachmentCountSubscription;
private readonly IDisposable downloadCountSubscription;
private IDisposable? finishedItemsSubscription;
private int doneItemsCount;
private int totalEnqueuedItemCount;
private int? totalItemsToDownloadCount;
public AttachmentsPageModel() : this(null!, State.Dummy) {}
public AttachmentsPageModel(Window window, State state) {
this.window = window;
this.state = state;
FilterModel = new AttachmentFilterPanelModel(state);
StatisticsRows = [
statisticsEnqueued,
statisticsDownloaded,
statisticsFailed,
statisticsSkipped
];
enqueueDownloadItemsTask = new ThrottledTask<int>(OnItemsEnqueued, TaskScheduler.FromCurrentSynchronizationContext());
downloadStatisticsTask = new ThrottledTask<DownloadStatusStatistics>(UpdateStatistics, TaskScheduler.FromCurrentSynchronizationContext());
attachmentCountSubscription = state.Db.Attachments.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnAttachmentCountChanged);
downloadCountSubscription = state.Db.Downloads.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnDownloadCountChanged);
RecomputeDownloadStatistics();
}
public void Dispose() {
attachmentCountSubscription.Dispose();
downloadCountSubscription.Dispose();
finishedItemsSubscription?.Dispose();
enqueueDownloadItemsTask.Dispose();
downloadStatisticsTask.Dispose();
FilterModel.Dispose();
}
private void OnAttachmentCountChanged(long newAttachmentCount) {
if (IsDownloading) {
EnqueueDownloadItemsLater();
}
else {
RecomputeDownloadStatistics();
}
}
private void OnDownloadCountChanged(long newDownloadCount) {
RecomputeDownloadStatistics();
}
private async Task EnqueueDownloadItems() {
try {
OnItemsEnqueued(await state.Db.Downloads.EnqueueDownloadItems(CreateAttachmentFilter()));
} catch (Exception e) {
Log.Error(e);
await Dialog.ShowOk(window, "Download Error", "Failed to enqueue items for download.");
}
}
private void EnqueueDownloadItemsLater() {
var filter = CreateAttachmentFilter();
enqueueDownloadItemsTask.Post(cancellationToken => state.Db.Downloads.EnqueueDownloadItems(filter, cancellationToken));
}
private void OnItemsEnqueued(int itemCount) {
totalEnqueuedItemCount += itemCount;
totalItemsToDownloadCount = totalEnqueuedItemCount;
UpdateDownloadMessage();
RecomputeDownloadStatistics();
}
private AttachmentFilter CreateAttachmentFilter() {
var filter = FilterModel.CreateFilter();
filter.DownloadItemRule = AttachmentFilter.DownloadItemRules.OnlyNotPresent;
return filter;
}
public async Task OnClickToggleDownload() {
IsToggleDownloadButtonEnabled = false;
if (IsDownloading) {
await state.Downloader.Stop();
finishedItemsSubscription?.Dispose();
finishedItemsSubscription = null;
RecomputeDownloadStatistics();
await state.Db.Downloads.RemoveDownloadItems(EnqueuedItemFilter, FilterRemovalMode.RemoveMatching);
doneItemsCount = 0;
totalEnqueuedItemCount = 0;
totalItemsToDownloadCount = null;
UpdateDownloadMessage();
}
else {
var finishedItems = await state.Downloader.Start();
finishedItemsSubscription = finishedItems.Select(static _ => true)
.Buffer(TimeSpan.FromMilliseconds(100))
.Select(static items => items.Count)
.Where(static items => items > 0)
.ObserveOn(AvaloniaScheduler.Instance)
.Subscribe(OnItemsFinished);
await EnqueueDownloadItems();
}
OnPropertyChanged(nameof(ToggleDownloadButtonText));
OnPropertyChanged(nameof(IsDownloading));
IsToggleDownloadButtonEnabled = true;
}
private void OnItemsFinished(int finishedItemCount) {
doneItemsCount += finishedItemCount;
UpdateDownloadMessage();
}
public async Task OnClickRetryFailedDownloads() {
IsRetryingFailedDownloads = true;
try {
var allExceptFailedFilter = new DownloadItemFilter {
IncludeStatuses = new HashSet<DownloadStatus> {
DownloadStatus.Enqueued,
DownloadStatus.Downloading,
DownloadStatus.Success
}
};
await state.Db.Downloads.RemoveDownloadItems(allExceptFailedFilter, FilterRemovalMode.KeepMatching);
if (IsDownloading) {
await EnqueueDownloadItems();
}
} catch (Exception e) {
Log.Error(e);
} finally {
IsRetryingFailedDownloads = false;
}
}
private void RecomputeDownloadStatistics() {
downloadStatisticsTask.Post(state.Db.Downloads.GetStatistics);
}
private void UpdateStatistics(DownloadStatusStatistics statusStatistics) {
statisticsEnqueued.Items = statusStatistics.EnqueuedCount;
statisticsEnqueued.Size = statusStatistics.EnqueuedSize;
statisticsDownloaded.Items = statusStatistics.SuccessfulCount;
statisticsDownloaded.Size = statusStatistics.SuccessfulSize;
statisticsFailed.Items = statusStatistics.FailedCount;
statisticsFailed.Size = statusStatistics.FailedSize;
statisticsSkipped.Items = statusStatistics.SkippedCount;
statisticsSkipped.Size = statusStatistics.SkippedSize;
HasFailedDownloads = statusStatistics.FailedCount > 0;
UpdateDownloadMessage();
}
private void UpdateDownloadMessage() {
DownloadMessage = IsDownloading ? doneItemsCount.Format() + " / " + (totalItemsToDownloadCount?.Format() ?? "?") : "";
OnPropertyChanged(nameof(DownloadProgress));
}
[ObservableObject]
public sealed partial class StatisticsRow(string state) {
public string State { get; } = state;
[ObservableProperty]
private int items;
[ObservableProperty]
private ulong? size;
}
}

View File

@ -1,186 +0,0 @@
using System;
using System.Collections.ObjectModel;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Avalonia.ReactiveUI;
using CommunityToolkit.Mvvm.ComponentModel;
using DHT.Desktop.Common;
using DHT.Desktop.Main.Controls;
using DHT.Server;
using DHT.Server.Data.Aggregations;
using DHT.Server.Data.Filters;
using DHT.Server.Download;
using DHT.Utils.Logging;
using DHT.Utils.Tasks;
namespace DHT.Desktop.Main.Pages;
sealed partial class DownloadsPageModel : ObservableObject, IDisposable {
private static readonly Log Log = Log.ForType<DownloadsPageModel>();
[ObservableProperty(Setter = Access.Private)]
private bool isToggleDownloadButtonEnabled = true;
public string ToggleDownloadButtonText => IsDownloading ? "Stop Downloading" : "Start Downloading";
[ObservableProperty(Setter = Access.Private)]
[NotifyPropertyChangedFor(nameof(IsRetryFailedOnDownloadsButtonEnabled))]
private bool isRetryingFailedDownloads = false;
[ObservableProperty(Setter = Access.Private)]
[NotifyPropertyChangedFor(nameof(IsRetryFailedOnDownloadsButtonEnabled))]
private bool hasFailedDownloads;
public bool IsRetryFailedOnDownloadsButtonEnabled => !IsRetryingFailedDownloads && HasFailedDownloads;
[ObservableProperty(Setter = Access.Private)]
private string downloadMessage = "";
public DownloadItemFilterPanelModel FilterModel { get; }
private readonly StatisticsRow statisticsPending = new ("Pending");
private readonly StatisticsRow statisticsDownloaded = new ("Downloaded");
private readonly StatisticsRow statisticsFailed = new ("Failed");
private readonly StatisticsRow statisticsSkipped = new ("Skipped");
public ObservableCollection<StatisticsRow> StatisticsRows { get; }
public bool IsDownloading => state.Downloader.IsDownloading;
private readonly State state;
private readonly ThrottledTask<DownloadStatusStatistics> downloadStatisticsTask;
private readonly IDisposable downloadItemCountSubscription;
private IDisposable? finishedItemsSubscription;
private DownloadItemFilter? currentDownloadFilter;
public DownloadsPageModel() : this(State.Dummy) {}
public DownloadsPageModel(State state) {
this.state = state;
FilterModel = new DownloadItemFilterPanelModel(state);
StatisticsRows = [
statisticsPending,
statisticsDownloaded,
statisticsFailed,
statisticsSkipped
];
downloadStatisticsTask = new ThrottledTask<DownloadStatusStatistics>(Log, UpdateStatistics, TaskScheduler.FromCurrentSynchronizationContext());
downloadItemCountSubscription = state.Db.Downloads.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnDownloadCountChanged);
RecomputeDownloadStatistics();
}
public void Dispose() {
finishedItemsSubscription?.Dispose();
downloadItemCountSubscription.Dispose();
downloadStatisticsTask.Dispose();
FilterModel.Dispose();
}
private void OnDownloadCountChanged(long newDownloadCount) {
RecomputeDownloadStatistics();
}
public async Task OnClickToggleDownload() {
IsToggleDownloadButtonEnabled = false;
if (IsDownloading) {
await state.Downloader.Stop();
await state.Db.Downloads.MoveDownloadingItemsBackToQueue();
finishedItemsSubscription?.Dispose();
finishedItemsSubscription = null;
currentDownloadFilter = null;
}
else {
await state.Db.Downloads.MoveDownloadingItemsBackToQueue();
var finishedItems = await state.Downloader.Start(currentDownloadFilter = FilterModel.CreateFilter());
finishedItemsSubscription = finishedItems.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnItemFinished);
}
RecomputeDownloadStatistics();
OnPropertyChanged(nameof(ToggleDownloadButtonText));
OnPropertyChanged(nameof(IsDownloading));
IsToggleDownloadButtonEnabled = true;
}
private void OnItemFinished(DownloadItem item) {
RecomputeDownloadStatistics();
}
public async Task OnClickRetryFailedDownloads() {
IsRetryingFailedDownloads = true;
try {
await state.Db.Downloads.RetryFailed();
RecomputeDownloadStatistics();
} catch (Exception e) {
Log.Error(e);
} finally {
IsRetryingFailedDownloads = false;
}
}
private void RecomputeDownloadStatistics() {
downloadStatisticsTask.Post(cancellationToken => state.Db.Downloads.GetStatistics(currentDownloadFilter ?? new DownloadItemFilter(), cancellationToken));
}
private void UpdateStatistics(DownloadStatusStatistics statusStatistics) {
statisticsPending.Items = statusStatistics.PendingCount;
statisticsPending.Size = statusStatistics.PendingTotalSize;
statisticsPending.HasFilesWithUnknownSize = statusStatistics.PendingWithUnknownSizeCount > 0;
statisticsDownloaded.Items = statusStatistics.SuccessfulCount;
statisticsDownloaded.Size = statusStatistics.SuccessfulTotalSize;
statisticsDownloaded.HasFilesWithUnknownSize = statusStatistics.SuccessfulWithUnknownSizeCount > 0;
statisticsFailed.Items = statusStatistics.FailedCount;
statisticsFailed.Size = statusStatistics.FailedTotalSize;
statisticsFailed.HasFilesWithUnknownSize = statusStatistics.FailedWithUnknownSizeCount > 0;
statisticsSkipped.Items = statusStatistics.SkippedCount;
statisticsSkipped.Size = statusStatistics.SkippedTotalSize;
statisticsSkipped.HasFilesWithUnknownSize = statusStatistics.SkippedWithUnknownSizeCount > 0;
HasFailedDownloads = statusStatistics.FailedCount > 0;
}
[ObservableObject]
public sealed partial class StatisticsRow(string state) {
public string State { get; } = state;
[ObservableProperty]
private int items;
[ObservableProperty]
[NotifyPropertyChangedFor(nameof(SizeText))]
private ulong? size;
[ObservableProperty]
[NotifyPropertyChangedFor(nameof(SizeText))]
private bool hasFilesWithUnknownSize;
public string SizeText {
get {
if (size == null) {
return "-";
}
else if (hasFilesWithUnknownSize) {
return "\u2265 " + BytesValueConverter.Convert(size.Value);
}
else {
return BytesValueConverter.Convert(size.Value);
}
}
}
}
}

View File

@ -74,7 +74,7 @@
<DockPanel> <DockPanel>
<Border Classes="statusBar" DockPanel.Dock="Bottom"> <Border Classes="statusBar" DockPanel.Dock="Bottom">
<DockPanel> <DockPanel>
<TextBlock Classes="invisibleTabItem" DockPanel.Dock="Left">Downloads</TextBlock> <TextBlock Classes="invisibleTabItem" DockPanel.Dock="Left">Attachments</TextBlock>
<controls:StatusBar DataContext="{Binding StatusBarModel}" DockPanel.Dock="Right" /> <controls:StatusBar DataContext="{Binding StatusBarModel}" DockPanel.Dock="Right" />
</DockPanel> </DockPanel>
</Border> </Border>
@ -94,9 +94,9 @@
<ContentPresenter Content="{Binding TrackingPage}" Classes="page" /> <ContentPresenter Content="{Binding TrackingPage}" Classes="page" />
</ScrollViewer> </ScrollViewer>
</TabItem> </TabItem>
<TabItem x:Name="TabDownloads" Header="Downloads" Grid.Row="2"> <TabItem x:Name="TabAttachments" Header="Attachments" Grid.Row="2">
<ScrollViewer> <ScrollViewer>
<ContentPresenter Content="{Binding DownloadsPage}" Classes="page" /> <ContentPresenter Content="{Binding AttachmentsPage}" Classes="page" />
</ScrollViewer> </ScrollViewer>
</TabItem> </TabItem>
<TabItem x:Name="TabViewer" Header="Viewer" Grid.Row="3"> <TabItem x:Name="TabViewer" Header="Viewer" Grid.Row="3">

View File

@ -13,8 +13,8 @@ sealed class MainContentScreenModel : IDisposable {
public TrackingPage TrackingPage { get; } public TrackingPage TrackingPage { get; }
private TrackingPageModel TrackingPageModel { get; } private TrackingPageModel TrackingPageModel { get; }
public DownloadsPage DownloadsPage { get; } public AttachmentsPage AttachmentsPage { get; }
private DownloadsPageModel DownloadsPageModel { get; } private AttachmentsPageModel AttachmentsPageModel { get; }
public ViewerPage ViewerPage { get; } public ViewerPage ViewerPage { get; }
private ViewerPageModel ViewerPageModel { get; } private ViewerPageModel ViewerPageModel { get; }
@ -52,8 +52,8 @@ sealed class MainContentScreenModel : IDisposable {
TrackingPageModel = new TrackingPageModel(window); TrackingPageModel = new TrackingPageModel(window);
TrackingPage = new TrackingPage { DataContext = TrackingPageModel }; TrackingPage = new TrackingPage { DataContext = TrackingPageModel };
DownloadsPageModel = new DownloadsPageModel(state); AttachmentsPageModel = new AttachmentsPageModel(window, state);
DownloadsPage = new DownloadsPage { DataContext = DownloadsPageModel }; AttachmentsPage = new AttachmentsPage { DataContext = AttachmentsPageModel };
ViewerPageModel = new ViewerPageModel(window, state); ViewerPageModel = new ViewerPageModel(window, state);
ViewerPage = new ViewerPage { DataContext = ViewerPageModel }; ViewerPage = new ViewerPage { DataContext = ViewerPageModel };
@ -72,7 +72,7 @@ sealed class MainContentScreenModel : IDisposable {
} }
public void Dispose() { public void Dispose() {
DownloadsPageModel.Dispose(); AttachmentsPageModel.Dispose();
ViewerPageModel.Dispose(); ViewerPageModel.Dispose();
AdvancedPageModel.Dispose(); AdvancedPageModel.Dispose();
StatusBarModel.Dispose(); StatusBarModel.Dispose();

View File

@ -1,19 +1,15 @@
namespace DHT.Server.Data.Aggregations; namespace DHT.Server.Data.Aggregations;
public sealed class DownloadStatusStatistics { public sealed class DownloadStatusStatistics {
public int PendingCount { get; internal init; } public int EnqueuedCount { get; internal set; }
public ulong PendingTotalSize { get; internal init; } public ulong EnqueuedSize { get; internal set; }
public int PendingWithUnknownSizeCount { get; internal init; }
public int SuccessfulCount { get; internal init; } public int SuccessfulCount { get; internal set; }
public ulong SuccessfulTotalSize { get; internal init; } public ulong SuccessfulSize { get; internal set; }
public int SuccessfulWithUnknownSizeCount { get; internal init; }
public int FailedCount { get; internal init; } public int FailedCount { get; internal set; }
public ulong FailedTotalSize { get; internal init; } public ulong FailedSize { get; internal set; }
public int FailedWithUnknownSizeCount { get; internal init; }
public int SkippedCount { get; internal set; }
public int SkippedCount { get; internal init; } public ulong SkippedSize { get; internal set; }
public ulong SkippedTotalSize { get; internal init; }
public int SkippedWithUnknownSizeCount { get; internal init; }
} }

View File

@ -1,17 +1,33 @@
using System;
using System.Net;
using DHT.Server.Download;
namespace DHT.Server.Data; namespace DHT.Server.Data;
public sealed class Download { public readonly struct Download {
internal static Download NewSuccess(DownloadItem item, byte[] data) {
return new Download(item.NormalizedUrl, item.DownloadUrl, DownloadStatus.Success, (ulong) Math.Max(data.LongLength, 0), data);
}
internal static Download NewFailure(DownloadItem item, HttpStatusCode? statusCode, ulong size) {
return new Download(item.NormalizedUrl, item.DownloadUrl, statusCode.HasValue ? (DownloadStatus) (int) statusCode : DownloadStatus.GenericError, size);
}
public string NormalizedUrl { get; } public string NormalizedUrl { get; }
public string DownloadUrl { get; } public string DownloadUrl { get; }
public DownloadStatus Status { get; } public DownloadStatus Status { get; }
public string? Type { get; } public ulong Size { get; }
public ulong? Size { get; } public byte[]? Data { get; }
internal Download(string normalizedUrl, string downloadUrl, DownloadStatus status, string? type, ulong? size) { internal Download(string normalizedUrl, string downloadUrl, DownloadStatus status, ulong size, byte[]? data = null) {
NormalizedUrl = normalizedUrl; NormalizedUrl = normalizedUrl;
DownloadUrl = downloadUrl; DownloadUrl = downloadUrl;
Status = status; Status = status;
Type = type;
Size = size; Size = size;
Data = data;
}
internal Download WithData(byte[] data) {
return new Download(NormalizedUrl, DownloadUrl, Status, Size, data);
} }
} }

View File

@ -6,9 +6,8 @@ namespace DHT.Server.Data;
/// Extends <see cref="HttpStatusCode"/> with custom status codes in the range 0-99. /// Extends <see cref="HttpStatusCode"/> with custom status codes in the range 0-99.
/// </summary> /// </summary>
public enum DownloadStatus { public enum DownloadStatus {
Pending = 0, Enqueued = 0,
GenericError = 1, GenericError = 1,
Downloading = 2, Downloading = 2,
LastCustomCode = 99,
Success = HttpStatusCode.OK Success = HttpStatusCode.OK
} }

View File

@ -1,3 +0,0 @@
namespace DHT.Server.Data;
public readonly record struct DownloadWithData(Download Download, byte[]? Data);

View File

@ -0,0 +1,6 @@
namespace DHT.Server.Data;
public readonly struct DownloadedAttachment {
public string? Type { get; internal init; }
public byte[] Data { get; internal init; }
}

View File

@ -1,17 +0,0 @@
namespace DHT.Server.Data.Embeds;
sealed class DiscordEmbedJson {
public string? Type { get; set; }
public string? Url { get; set; }
public JsonImage? Image { get; set; }
public JsonImage? Thumbnail { get; set; }
public JsonImage? Video { get; set; }
public sealed class JsonImage {
public string? Url { get; set; }
public string? ProxyUrl { get; set; }
public int? Width { get; set; }
public int? Height { get; set; }
}
}

View File

@ -1,7 +0,0 @@
using System.Text.Json.Serialization;
namespace DHT.Server.Data.Embeds;
[JsonSourceGenerationOptions(PropertyNamingPolicy = JsonKnownNamingPolicy.SnakeCaseLower, GenerationMode = JsonSourceGenerationMode.Default)]
[JsonSerializable(typeof(DiscordEmbedJson))]
sealed partial class DiscordEmbedJsonContext : JsonSerializerContext;

View File

@ -0,0 +1,15 @@
namespace DHT.Server.Data.Filters;
public sealed class AttachmentFilter {
public ulong? MaxBytes { get; set; } = null;
public DownloadItemRules? DownloadItemRule { get; set; } = null;
public bool IsEmpty => MaxBytes == null &&
DownloadItemRule == null;
public enum DownloadItemRules {
OnlyNotPresent,
OnlyPresent
}
}

View File

@ -3,10 +3,8 @@ using System.Collections.Generic;
namespace DHT.Server.Data.Filters; namespace DHT.Server.Data.Filters;
public sealed class DownloadItemFilter { public sealed class DownloadItemFilter {
public HashSet<DownloadStatus>? IncludeStatuses { get; set; } = null; public HashSet<DownloadStatus>? IncludeStatuses { get; init; } = null;
public HashSet<DownloadStatus>? ExcludeStatuses { get; set; } = null; public HashSet<DownloadStatus>? ExcludeStatuses { get; init; } = null;
public ulong? MaxBytes { get; set; } = null;
public bool IsEmpty => IncludeStatuses == null && ExcludeStatuses == null && MaxBytes == null; public bool IsEmpty => IncludeStatuses == null && ExcludeStatuses == null;
} }

View File

@ -25,8 +25,8 @@ public static class DatabaseExtensions {
await target.Messages.Add(batchedMessages); await target.Messages.Add(batchedMessages);
await foreach (var download in source.Downloads.Get()) { await foreach (var download in source.Downloads.GetWithoutData()) {
await target.Downloads.AddDownload(await source.Downloads.HydrateWithData(download)); await target.Downloads.AddDownload(download.Status == DownloadStatus.Success ? await source.Downloads.HydrateWithData(download) : download);
} }
} }
} }

View File

@ -14,6 +14,7 @@ sealed class DummyDatabaseFile : IDatabaseFile {
public IServerRepository Servers { get; } = new IServerRepository.Dummy(); public IServerRepository Servers { get; } = new IServerRepository.Dummy();
public IChannelRepository Channels { get; } = new IChannelRepository.Dummy(); public IChannelRepository Channels { get; } = new IChannelRepository.Dummy();
public IMessageRepository Messages { get; } = new IMessageRepository.Dummy(); public IMessageRepository Messages { get; } = new IMessageRepository.Dummy();
public IAttachmentRepository Attachments { get; } = new IAttachmentRepository.Dummy();
public IDownloadRepository Downloads { get; } = new IDownloadRepository.Dummy(); public IDownloadRepository Downloads { get; } = new IDownloadRepository.Dummy();
private DummyDatabaseFile() {} private DummyDatabaseFile() {}

View File

@ -13,6 +13,6 @@ public sealed class LiveViewerExportStrategy : IViewerExportStrategy {
} }
public string GetAttachmentUrl(Attachment attachment) { public string GetAttachmentUrl(Attachment attachment) {
return "http://127.0.0.1:" + safePort + "/get-downloaded-file/" + WebUtility.UrlEncode(attachment.NormalizedUrl) + "?token=" + safeToken; return "http://127.0.0.1:" + safePort + "/get-attachment/" + WebUtility.UrlEncode(attachment.NormalizedUrl) + "?token=" + safeToken;
} }
} }

View File

@ -11,6 +11,7 @@ public interface IDatabaseFile : IAsyncDisposable {
IServerRepository Servers { get; } IServerRepository Servers { get; }
IChannelRepository Channels { get; } IChannelRepository Channels { get; }
IMessageRepository Messages { get; } IMessageRepository Messages { get; }
IAttachmentRepository Attachments { get; }
IDownloadRepository Downloads { get; } IDownloadRepository Downloads { get; }
Task Vacuum(); Task Vacuum();

View File

@ -0,0 +1,21 @@
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using DHT.Server.Data.Filters;
namespace DHT.Server.Database.Repositories;
public interface IAttachmentRepository {
IObservable<long> TotalCount { get; }
Task<long> Count(AttachmentFilter? filter = null, CancellationToken cancellationToken = default);
internal sealed class Dummy : IAttachmentRepository {
public IObservable<long> TotalCount { get; } = Observable.Return(0L);
public Task<long> Count(AttachmentFilter? filter = null, CancellationToken cancellationToken = default) {
return Task.FromResult(0L);
}
}
}

View File

@ -14,61 +14,55 @@ namespace DHT.Server.Database.Repositories;
public interface IDownloadRepository { public interface IDownloadRepository {
IObservable<long> TotalCount { get; } IObservable<long> TotalCount { get; }
Task AddDownload(DownloadWithData item); Task AddDownload(Data.Download download);
Task<long> Count(DownloadItemFilter filter, CancellationToken cancellationToken = default); Task<DownloadStatusStatistics> GetStatistics(CancellationToken cancellationToken = default);
Task<DownloadStatusStatistics> GetStatistics(DownloadItemFilter nonSkippedFilter, CancellationToken cancellationToken = default); IAsyncEnumerable<Data.Download> GetWithoutData();
IAsyncEnumerable<Data.Download> Get();
Task<DownloadWithData> HydrateWithData(Data.Download download); Task<Data.Download> HydrateWithData(Data.Download download);
Task<DownloadWithData?> GetSuccessfulDownloadWithData(string normalizedUrl); Task<DownloadedAttachment?> GetDownloadedAttachment(string normalizedUrl);
IAsyncEnumerable<DownloadItem> PullPendingDownloadItems(int count, DownloadItemFilter filter, CancellationToken cancellationToken = default); Task<int> EnqueueDownloadItems(AttachmentFilter? filter = null, CancellationToken cancellationToken = default);
Task MoveDownloadingItemsBackToQueue(CancellationToken cancellationToken = default); IAsyncEnumerable<DownloadItem> PullEnqueuedDownloadItems(int count, CancellationToken cancellationToken = default);
Task<int> RetryFailed(CancellationToken cancellationToken = default); Task RemoveDownloadItems(DownloadItemFilter? filter, FilterRemovalMode mode);
internal sealed class Dummy : IDownloadRepository { internal sealed class Dummy : IDownloadRepository {
public IObservable<long> TotalCount { get; } = Observable.Return(0L); public IObservable<long> TotalCount { get; } = Observable.Return(0L);
public Task AddDownload(DownloadWithData item) { public Task AddDownload(Data.Download download) {
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task<long> Count(DownloadItemFilter filter, CancellationToken cancellationToken) { public Task<DownloadStatusStatistics> GetStatistics(CancellationToken cancellationToken) {
return Task.FromResult(0L);
}
public Task<DownloadStatusStatistics> GetStatistics(DownloadItemFilter nonSkippedFilter, CancellationToken cancellationToken) {
return Task.FromResult(new DownloadStatusStatistics()); return Task.FromResult(new DownloadStatusStatistics());
} }
public IAsyncEnumerable<Data.Download> Get() { public IAsyncEnumerable<Data.Download> GetWithoutData() {
return AsyncEnumerable.Empty<Data.Download>(); return AsyncEnumerable.Empty<Data.Download>();
} }
public Task<DownloadWithData> HydrateWithData(Data.Download download) { public Task<Data.Download> HydrateWithData(Data.Download download) {
return Task.FromResult(new DownloadWithData(download, Data: null)); return Task.FromResult(download);
} }
public Task<DownloadWithData?> GetSuccessfulDownloadWithData(string normalizedUrl) { public Task<DownloadedAttachment?> GetDownloadedAttachment(string normalizedUrl) {
return Task.FromResult<DownloadWithData?>(null); return Task.FromResult<DownloadedAttachment?>(null);
} }
public IAsyncEnumerable<DownloadItem> PullPendingDownloadItems(int count, DownloadItemFilter filter, CancellationToken cancellationToken) { public Task<int> EnqueueDownloadItems(AttachmentFilter? filter, CancellationToken cancellationToken) {
return Task.FromResult(0);
}
public IAsyncEnumerable<DownloadItem> PullEnqueuedDownloadItems(int count, CancellationToken cancellationToken) {
return AsyncEnumerable.Empty<DownloadItem>(); return AsyncEnumerable.Empty<DownloadItem>();
} }
public Task MoveDownloadingItemsBackToQueue(CancellationToken cancellationToken) { public Task RemoveDownloadItems(DownloadItemFilter? filter, FilterRemovalMode mode) {
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task<int> RetryFailed(CancellationToken cancellationToken) {
return Task.FromResult(0);
}
} }
} }

View File

@ -1,20 +1,16 @@
using System; using System;
using System.Reactive.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DHT.Utils.Logging;
using DHT.Utils.Tasks; using DHT.Utils.Tasks;
namespace DHT.Server.Database.Sqlite.Repositories; namespace DHT.Server.Database.Sqlite.Repositories;
abstract class BaseSqliteRepository : IDisposable { abstract class BaseSqliteRepository : IDisposable {
private readonly ObservableThrottledTask<long> totalCountTask; private readonly ObservableThrottledTask<long> totalCountTask = new (TaskScheduler.Default);
public IObservable<long> TotalCount => totalCountTask;
public IObservable<long> TotalCount { get; } protected BaseSqliteRepository() {
protected BaseSqliteRepository(Log log) {
totalCountTask = new ObservableThrottledTask<long>(log, TaskScheduler.Default);
TotalCount = totalCountTask.DistinctUntilChanged();
UpdateTotalCount(); UpdateTotalCount();
} }
@ -25,6 +21,6 @@ abstract class BaseSqliteRepository : IDisposable {
protected void UpdateTotalCount() { protected void UpdateTotalCount() {
totalCountTask.Post(Count); totalCountTask.Post(Count);
} }
public abstract Task<long> Count(CancellationToken cancellationToken); public abstract Task<long> Count(CancellationToken cancellationToken);
} }

View File

@ -0,0 +1,28 @@
using System.Threading;
using System.Threading.Tasks;
using DHT.Server.Data.Filters;
using DHT.Server.Database.Repositories;
using DHT.Server.Database.Sqlite.Utils;
namespace DHT.Server.Database.Sqlite.Repositories;
sealed class SqliteAttachmentRepository : BaseSqliteRepository, IAttachmentRepository {
private readonly SqliteConnectionPool pool;
public SqliteAttachmentRepository(SqliteConnectionPool pool) {
this.pool = pool;
}
internal new void UpdateTotalCount() {
base.UpdateTotalCount();
}
public override Task<long> Count(CancellationToken cancellationToken) {
return Count(filter: null, cancellationToken);
}
public async Task<long> Count(AttachmentFilter? filter, CancellationToken cancellationToken) {
await using var conn = await pool.Take();
return await conn.ExecuteReaderAsync("SELECT COUNT(DISTINCT normalized_url) FROM attachments a" + filter.GenerateWhereClause("a"), static reader => reader?.GetInt64(0) ?? 0L, cancellationToken);
}
}

View File

@ -4,17 +4,14 @@ using System.Threading.Tasks;
using DHT.Server.Data; using DHT.Server.Data;
using DHT.Server.Database.Repositories; using DHT.Server.Database.Repositories;
using DHT.Server.Database.Sqlite.Utils; using DHT.Server.Database.Sqlite.Utils;
using DHT.Utils.Logging;
using Microsoft.Data.Sqlite; using Microsoft.Data.Sqlite;
namespace DHT.Server.Database.Sqlite.Repositories; namespace DHT.Server.Database.Sqlite.Repositories;
sealed class SqliteChannelRepository : BaseSqliteRepository, IChannelRepository { sealed class SqliteChannelRepository : BaseSqliteRepository, IChannelRepository {
private static readonly Log Log = Log.ForType<SqliteChannelRepository>();
private readonly SqliteConnectionPool pool; private readonly SqliteConnectionPool pool;
public SqliteChannelRepository(SqliteConnectionPool pool) : base(Log) { public SqliteChannelRepository(SqliteConnectionPool pool) {
this.pool = pool; this.pool = pool;
} }

View File

@ -9,204 +9,140 @@ using DHT.Server.Data.Filters;
using DHT.Server.Database.Repositories; using DHT.Server.Database.Repositories;
using DHT.Server.Database.Sqlite.Utils; using DHT.Server.Database.Sqlite.Utils;
using DHT.Server.Download; using DHT.Server.Download;
using DHT.Utils.Logging;
using Microsoft.Data.Sqlite; using Microsoft.Data.Sqlite;
namespace DHT.Server.Database.Sqlite.Repositories; namespace DHT.Server.Database.Sqlite.Repositories;
sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepository { sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepository {
private static readonly Log Log = Log.ForType<SqliteDownloadRepository>();
private readonly SqliteConnectionPool pool; private readonly SqliteConnectionPool pool;
public SqliteDownloadRepository(SqliteConnectionPool pool) : base(Log) { public SqliteDownloadRepository(SqliteConnectionPool pool) {
this.pool = pool; this.pool = pool;
} }
internal sealed class NewDownloadCollector : IAsyncDisposable { public async Task AddDownload(Data.Download download) {
private readonly SqliteDownloadRepository repository;
private bool hasAdded = false;
private readonly SqliteCommand metadataCmd;
public NewDownloadCollector(SqliteDownloadRepository repository, ISqliteConnection conn) {
this.repository = repository;
metadataCmd = conn.Command(
"""
INSERT INTO download_metadata (normalized_url, download_url, status, type, size)
VALUES (:normalized_url, :download_url, :status, :type, :size)
ON CONFLICT DO NOTHING
"""
);
metadataCmd.Add(":normalized_url", SqliteType.Text);
metadataCmd.Add(":download_url", SqliteType.Text);
metadataCmd.Add(":status", SqliteType.Integer);
metadataCmd.Add(":type", SqliteType.Text);
metadataCmd.Add(":size", SqliteType.Integer);
}
public async Task Add(Data.Download download) {
metadataCmd.Set(":normalized_url", download.NormalizedUrl);
metadataCmd.Set(":download_url", download.DownloadUrl);
metadataCmd.Set(":status", (int) download.Status);
metadataCmd.Set(":type", download.Type);
metadataCmd.Set(":size", download.Size);
hasAdded |= await metadataCmd.ExecuteNonQueryAsync() > 0;
}
public void OnCommitted() {
if (hasAdded) {
repository.UpdateTotalCount();
}
}
public async ValueTask DisposeAsync() {
await metadataCmd.DisposeAsync();
}
}
public async Task AddDownload(DownloadWithData item) {
var (download, data) = item;
await using (var conn = await pool.Take()) { await using (var conn = await pool.Take()) {
var tx = await conn.BeginTransactionAsync(); await using var cmd = conn.Upsert("downloads", [
await using var metadataCmd = conn.Upsert("download_metadata", [
("normalized_url", SqliteType.Text), ("normalized_url", SqliteType.Text),
("download_url", SqliteType.Text), ("download_url", SqliteType.Text),
("status", SqliteType.Integer), ("status", SqliteType.Integer),
("type", SqliteType.Text),
("size", SqliteType.Integer), ("size", SqliteType.Integer),
("blob", SqliteType.Blob)
]); ]);
metadataCmd.Set(":normalized_url", download.NormalizedUrl); cmd.Set(":normalized_url", download.NormalizedUrl);
metadataCmd.Set(":download_url", download.DownloadUrl); cmd.Set(":download_url", download.DownloadUrl);
metadataCmd.Set(":status", (int) download.Status); cmd.Set(":status", (int) download.Status);
metadataCmd.Set(":type", download.Type); cmd.Set(":size", download.Size);
metadataCmd.Set(":size", download.Size); cmd.Set(":blob", download.Data);
await metadataCmd.ExecuteNonQueryAsync(); await cmd.ExecuteNonQueryAsync();
if (data == null) {
await using var deleteBlobCmd = conn.Command("DELETE FROM download_blobs WHERE normalized_url = :normalized_url");
deleteBlobCmd.AddAndSet(":normalized_url", SqliteType.Text, download.NormalizedUrl);
await deleteBlobCmd.ExecuteNonQueryAsync();
}
else {
await using var upsertBlobCmd = conn.Upsert("download_blobs", [
("normalized_url", SqliteType.Text),
("blob", SqliteType.Blob)
]);
upsertBlobCmd.Set(":normalized_url", download.NormalizedUrl);
upsertBlobCmd.Set(":blob", data);
await upsertBlobCmd.ExecuteNonQueryAsync();
}
await tx.CommitAsync();
} }
UpdateTotalCount(); UpdateTotalCount();
} }
public override Task<long> Count(CancellationToken cancellationToken) { public override async Task<long> Count(CancellationToken cancellationToken) {
return Count(filter: null, cancellationToken); await using var conn = await pool.Take();
return await conn.ExecuteReaderAsync("SELECT COUNT(*) FROM downloads", static reader => reader?.GetInt64(0) ?? 0L, cancellationToken);
} }
public async Task<long> Count(DownloadItemFilter? filter, CancellationToken cancellationToken) { public async Task<DownloadStatusStatistics> GetStatistics(CancellationToken cancellationToken) {
await using var conn = await pool.Take(); static async Task LoadUndownloadedStatistics(ISqliteConnection conn, DownloadStatusStatistics result, CancellationToken cancellationToken) {
return await conn.ExecuteReaderAsync("SELECT COUNT(*) FROM download_metadata" + filter.GenerateConditions().BuildWhereClause(), static reader => reader?.GetInt64(0) ?? 0L, cancellationToken); await using var cmd = conn.Command(
} """
SELECT IFNULL(COUNT(size), 0), IFNULL(SUM(size), 0)
FROM (SELECT MAX(a.size) size
FROM attachments a
WHERE a.normalized_url NOT IN (SELECT d.normalized_url FROM downloads d)
GROUP BY a.normalized_url)
""");
public async Task<DownloadStatusStatistics> GetStatistics(DownloadItemFilter nonSkippedFilter, CancellationToken cancellationToken) { await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
nonSkippedFilter.IncludeStatuses = null;
nonSkippedFilter.ExcludeStatuses = null;
string nonSkippedFilterConditions = nonSkippedFilter.GenerateConditions().Build();
await using var conn = await pool.Take(); if (await reader.ReadAsync(cancellationToken)) {
result.SkippedCount = reader.GetInt32(0);
await using var cmd = conn.Command( result.SkippedSize = reader.GetUint64(1);
$""" }
SELECT
IFNULL(SUM(CASE WHEN (status = :downloading) OR (status = :pending AND {nonSkippedFilterConditions}) THEN 1 ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN (status = :downloading) OR (status = :pending AND {nonSkippedFilterConditions}) THEN IFNULL(size, 0) ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN ((status = :downloading) OR (status = :pending AND {nonSkippedFilterConditions})) AND size IS NULL THEN 1 ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN status = :success THEN 1 ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN status = :success THEN IFNULL(size, 0) ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN status = :success AND size IS NULL THEN 1 ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN status NOT IN (:pending, :downloading, :success) THEN 1 ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN status NOT IN (:pending, :downloading, :success) THEN IFNULL(size, 0) ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN status NOT IN (:pending, :downloading, :success) AND size IS NULL THEN 1 ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN status = :pending AND NOT ({nonSkippedFilterConditions}) THEN 1 ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN status = :pending AND NOT ({nonSkippedFilterConditions}) THEN IFNULL(size, 0) ELSE 0 END), 0),
IFNULL(SUM(CASE WHEN status = :pending AND NOT ({nonSkippedFilterConditions}) AND size IS NULL THEN 1 ELSE 0 END), 0)
FROM download_metadata
"""
);
cmd.AddAndSet(":pending", SqliteType.Integer, (int) DownloadStatus.Pending);
cmd.AddAndSet(":downloading", SqliteType.Integer, (int) DownloadStatus.Downloading);
cmd.AddAndSet(":success", SqliteType.Integer, (int) DownloadStatus.Success);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken)) {
return new DownloadStatusStatistics();
} }
return new DownloadStatusStatistics { static async Task LoadSuccessStatistics(ISqliteConnection conn, DownloadStatusStatistics result, CancellationToken cancellationToken) {
PendingCount = reader.GetInt32(0), await using var cmd = conn.Command(
PendingTotalSize = reader.GetUint64(1), """
PendingWithUnknownSizeCount = reader.GetInt32(2), SELECT
SuccessfulCount = reader.GetInt32(3), IFNULL(SUM(CASE WHEN status IN (:enqueued, :downloading) THEN 1 ELSE 0 END), 0),
SuccessfulTotalSize = reader.GetUint64(4), IFNULL(SUM(CASE WHEN status IN (:enqueued, :downloading) THEN size ELSE 0 END), 0),
SuccessfulWithUnknownSizeCount = reader.GetInt32(5), IFNULL(SUM(CASE WHEN status = :success THEN 1 ELSE 0 END), 0),
FailedCount = reader.GetInt32(6), IFNULL(SUM(CASE WHEN status = :success THEN size ELSE 0 END), 0),
FailedTotalSize = reader.GetUint64(7), IFNULL(SUM(CASE WHEN status NOT IN (:enqueued, :downloading) AND status != :success THEN 1 ELSE 0 END), 0),
FailedWithUnknownSizeCount = reader.GetInt32(8), IFNULL(SUM(CASE WHEN status NOT IN (:enqueued, :downloading) AND status != :success THEN size ELSE 0 END), 0)
SkippedCount = reader.GetInt32(9), FROM downloads
SkippedTotalSize = reader.GetUint64(10), """
SkippedWithUnknownSizeCount = reader.GetInt32(11) );
};
cmd.AddAndSet(":enqueued", SqliteType.Integer, (int) DownloadStatus.Enqueued);
cmd.AddAndSet(":downloading", SqliteType.Integer, (int) DownloadStatus.Downloading);
cmd.AddAndSet(":success", SqliteType.Integer, (int) DownloadStatus.Success);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
if (await reader.ReadAsync(cancellationToken)) {
result.EnqueuedCount = reader.GetInt32(0);
result.EnqueuedSize = reader.GetUint64(1);
result.SuccessfulCount = reader.GetInt32(2);
result.SuccessfulSize = reader.GetUint64(3);
result.FailedCount = reader.GetInt32(4);
result.FailedSize = reader.GetUint64(5);
}
}
var result = new DownloadStatusStatistics();
await using var conn = await pool.Take();
await LoadUndownloadedStatistics(conn, result, cancellationToken);
await LoadSuccessStatistics(conn, result, cancellationToken);
return result;
} }
public async IAsyncEnumerable<Data.Download> Get() { public async IAsyncEnumerable<Data.Download> GetWithoutData() {
await using var conn = await pool.Take(); await using var conn = await pool.Take();
await using var cmd = conn.Command("SELECT normalized_url, download_url, status, type, size FROM download_metadata"); await using var cmd = conn.Command("SELECT normalized_url, download_url, status, size FROM downloads");
await using var reader = await cmd.ExecuteReaderAsync(); await using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync()) { while (await reader.ReadAsync()) {
string normalizedUrl = reader.GetString(0); string normalizedUrl = reader.GetString(0);
string downloadUrl = reader.GetString(1); string downloadUrl = reader.GetString(1);
var status = (DownloadStatus) reader.GetInt32(2); var status = (DownloadStatus) reader.GetInt32(2);
string? type = reader.IsDBNull(3) ? null : reader.GetString(3); ulong size = reader.GetUint64(3);
ulong? size = reader.IsDBNull(4) ? null : reader.GetUint64(4);
yield return new Data.Download(normalizedUrl, downloadUrl, status, type, size); yield return new Data.Download(normalizedUrl, downloadUrl, status, size);
} }
} }
public async Task<DownloadWithData> HydrateWithData(Data.Download download) { public async Task<Data.Download> HydrateWithData(Data.Download download) {
await using var conn = await pool.Take(); await using var conn = await pool.Take();
await using var cmd = conn.Command("SELECT blob FROM download_blobs WHERE normalized_url = :url"); await using var cmd = conn.Command("SELECT blob FROM downloads WHERE normalized_url = :url");
cmd.AddAndSet(":url", SqliteType.Text, download.NormalizedUrl); cmd.AddAndSet(":url", SqliteType.Text, download.NormalizedUrl);
await using var reader = await cmd.ExecuteReaderAsync(); await using var reader = await cmd.ExecuteReaderAsync();
var data = await reader.ReadAsync() && !reader.IsDBNull(0) ? (byte[]) reader["blob"] : null;
if (await reader.ReadAsync() && !reader.IsDBNull(0)) {
return new DownloadWithData(download, data); return download.WithData((byte[]) reader["blob"]);
}
else {
return download;
}
} }
public async Task<DownloadWithData?> GetSuccessfulDownloadWithData(string normalizedUrl) { public async Task<DownloadedAttachment?> GetDownloadedAttachment(string normalizedUrl) {
await using var conn = await pool.Take(); await using var conn = await pool.Take();
await using var cmd = conn.Command( await using var cmd = conn.Command(
""" """
SELECT dm.download_url, dm.type, db.blob FROM download_metadata dm SELECT a.type, d.blob FROM downloads d
JOIN download_blobs db ON dm.normalized_url = db.normalized_url LEFT JOIN attachments a ON d.normalized_url = a.normalized_url
WHERE dm.normalized_url = :normalized_url AND dm.status = :success IS NOT NULL WHERE d.normalized_url = :normalized_url AND d.status = :success AND d.blob IS NOT NULL
""" """
); );
@ -219,31 +155,36 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
return null; return null;
} }
var downloadUrl = reader.GetString(0); return new DownloadedAttachment {
var type = reader.IsDBNull(1) ? null : reader.GetString(1); Type = reader.IsDBNull(0) ? null : reader.GetString(0),
var data = (byte[]) reader[2]; Data = (byte[]) reader["blob"],
var size = (ulong) data.LongLength; };
var download = new Data.Download(normalizedUrl, downloadUrl, DownloadStatus.Success, type, size);
return new DownloadWithData(download, data);
} }
public async IAsyncEnumerable<DownloadItem> PullPendingDownloadItems(int count, DownloadItemFilter filter, [EnumeratorCancellation] CancellationToken cancellationToken) { public async Task<int> EnqueueDownloadItems(AttachmentFilter? filter, CancellationToken cancellationToken) {
filter.IncludeStatuses = [DownloadStatus.Pending]; await using var conn = await pool.Take();
filter.ExcludeStatuses = null;
await using var cmd = conn.Command(
$"""
INSERT INTO downloads (normalized_url, download_url, status, size)
SELECT a.normalized_url, a.download_url, :enqueued, MAX(a.size)
FROM attachments a
{filter.GenerateWhereClause("a")}
GROUP BY a.normalized_url
"""
);
cmd.AddAndSet(":enqueued", SqliteType.Integer, (int) DownloadStatus.Enqueued);
return await cmd.ExecuteNonQueryAsync(cancellationToken);
}
public async IAsyncEnumerable<DownloadItem> PullEnqueuedDownloadItems(int count, [EnumeratorCancellation] CancellationToken cancellationToken) {
var found = new List<DownloadItem>(); var found = new List<DownloadItem>();
await using var conn = await pool.Take(); await using var conn = await pool.Take();
var sql = $""" await using (var cmd = conn.Command("SELECT normalized_url, download_url, size FROM downloads WHERE status = :enqueued LIMIT :limit")) {
SELECT normalized_url, download_url, type, size cmd.AddAndSet(":enqueued", SqliteType.Integer, (int) DownloadStatus.Enqueued);
FROM download_metadata
{filter.GenerateConditions().BuildWhereClause()}
LIMIT :limit
""";
await using (var cmd = conn.Command(sql)) {
cmd.AddAndSet(":limit", SqliteType.Integer, Math.Max(0, count)); cmd.AddAndSet(":limit", SqliteType.Integer, Math.Max(0, count));
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
@ -252,15 +193,14 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
found.Add(new DownloadItem { found.Add(new DownloadItem {
NormalizedUrl = reader.GetString(0), NormalizedUrl = reader.GetString(0),
DownloadUrl = reader.GetString(1), DownloadUrl = reader.GetString(1),
Type = reader.IsDBNull(2) ? null : reader.GetString(2), Size = reader.GetUint64(2),
Size = reader.IsDBNull(3) ? null : reader.GetUint64(3)
}); });
} }
} }
if (found.Count != 0) { if (found.Count != 0) {
await using var cmd = conn.Command("UPDATE download_metadata SET status = :downloading WHERE normalized_url = :normalized_url AND status = :pending"); await using var cmd = conn.Command("UPDATE downloads SET status = :downloading WHERE normalized_url = :normalized_url AND status = :enqueued");
cmd.AddAndSet(":pending", SqliteType.Integer, (int) DownloadStatus.Pending); cmd.AddAndSet(":enqueued", SqliteType.Integer, (int) DownloadStatus.Enqueued);
cmd.AddAndSet(":downloading", SqliteType.Integer, (int) DownloadStatus.Downloading); cmd.AddAndSet(":downloading", SqliteType.Integer, (int) DownloadStatus.Downloading);
cmd.Add(":normalized_url", SqliteType.Text); cmd.Add(":normalized_url", SqliteType.Text);
@ -274,23 +214,17 @@ sealed class SqliteDownloadRepository : BaseSqliteRepository, IDownloadRepositor
} }
} }
public async Task MoveDownloadingItemsBackToQueue(CancellationToken cancellationToken) { public async Task RemoveDownloadItems(DownloadItemFilter? filter, FilterRemovalMode mode) {
await using var conn = await pool.Take(); await using (var conn = await pool.Take()) {
await conn.ExecuteAsync(
$"""
-- noinspection SqlWithoutWhere
DELETE FROM downloads
{filter.GenerateWhereClause(invert: mode == FilterRemovalMode.KeepMatching)}
"""
);
}
await using var cmd = conn.Command("UPDATE download_metadata SET status = :pending WHERE status = :downloading"); UpdateTotalCount();
cmd.AddAndSet(":pending", SqliteType.Integer, (int) DownloadStatus.Pending);
cmd.AddAndSet(":downloading", SqliteType.Integer, (int) DownloadStatus.Downloading);
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
public async Task<int> RetryFailed(CancellationToken cancellationToken) {
await using var conn = await pool.Take();
await using var cmd = conn.Command("UPDATE download_metadata SET status = :pending WHERE status = :generic_error OR (status > :last_custom_code AND status != :success)");
cmd.AddAndSet(":pending", SqliteType.Integer, (int) DownloadStatus.Pending);
cmd.AddAndSet(":generic_error", SqliteType.Integer, (int) DownloadStatus.GenericError);
cmd.AddAndSet(":last_custom_code", SqliteType.Integer, (int) DownloadStatus.LastCustomCode);
cmd.AddAndSet(":success", SqliteType.Integer, (int) DownloadStatus.Success);
return await cmd.ExecuteNonQueryAsync(cancellationToken);
} }
} }

View File

@ -7,21 +7,17 @@ using DHT.Server.Data;
using DHT.Server.Data.Filters; using DHT.Server.Data.Filters;
using DHT.Server.Database.Repositories; using DHT.Server.Database.Repositories;
using DHT.Server.Database.Sqlite.Utils; using DHT.Server.Database.Sqlite.Utils;
using DHT.Server.Download;
using DHT.Utils.Logging;
using Microsoft.Data.Sqlite; using Microsoft.Data.Sqlite;
namespace DHT.Server.Database.Sqlite.Repositories; namespace DHT.Server.Database.Sqlite.Repositories;
sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository { sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository {
private static readonly Log Log = Log.ForType<SqliteMessageRepository>();
private readonly SqliteConnectionPool pool; private readonly SqliteConnectionPool pool;
private readonly SqliteDownloadRepository downloads; private readonly SqliteAttachmentRepository attachments;
public SqliteMessageRepository(SqliteConnectionPool pool, SqliteDownloadRepository downloads) : base(Log) { public SqliteMessageRepository(SqliteConnectionPool pool, SqliteAttachmentRepository attachments) {
this.pool = pool; this.pool = pool;
this.downloads = downloads; this.attachments = attachments;
} }
public async Task Add(IReadOnlyList<Message> messages) { public async Task Add(IReadOnlyList<Message> messages) {
@ -38,6 +34,8 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
await cmd.ExecuteNonQueryAsync(); await cmd.ExecuteNonQueryAsync();
} }
bool addedAttachments = false;
await using (var conn = await pool.Take()) { await using (var conn = await pool.Take()) {
await using var tx = await conn.BeginTransactionAsync(); await using var tx = await conn.BeginTransactionAsync();
@ -90,8 +88,6 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
("emoji_flags", SqliteType.Integer), ("emoji_flags", SqliteType.Integer),
("count", SqliteType.Integer) ("count", SqliteType.Integer)
]); ]);
await using var downloadCollector = new SqliteDownloadRepository.NewDownloadCollector(downloads, conn);
foreach (var message in messages) { foreach (var message in messages) {
object messageId = message.Id; object messageId = message.Id;
@ -123,6 +119,8 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
} }
if (!message.Attachments.IsEmpty) { if (!message.Attachments.IsEmpty) {
addedAttachments = true;
foreach (var attachment in message.Attachments) { foreach (var attachment in message.Attachments) {
attachmentCmd.Set(":message_id", messageId); attachmentCmd.Set(":message_id", messageId);
attachmentCmd.Set(":attachment_id", attachment.Id); attachmentCmd.Set(":attachment_id", attachment.Id);
@ -134,8 +132,6 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
attachmentCmd.Set(":width", attachment.Width); attachmentCmd.Set(":width", attachment.Width);
attachmentCmd.Set(":height", attachment.Height); attachmentCmd.Set(":height", attachment.Height);
await attachmentCmd.ExecuteNonQueryAsync(); await attachmentCmd.ExecuteNonQueryAsync();
await downloadCollector.Add(DownloadLinkExtractor.FromAttachment(attachment));
} }
} }
@ -144,10 +140,6 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
embedCmd.Set(":message_id", messageId); embedCmd.Set(":message_id", messageId);
embedCmd.Set(":json", embed.Json); embedCmd.Set(":json", embed.Json);
await embedCmd.ExecuteNonQueryAsync(); await embedCmd.ExecuteNonQueryAsync();
if (DownloadLinkExtractor.TryFromEmbedJson(embed.Json) is {} download) {
await downloadCollector.Add(download);
}
} }
} }
@ -159,19 +151,18 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
reactionCmd.Set(":emoji_flags", (int) reaction.EmojiFlags); reactionCmd.Set(":emoji_flags", (int) reaction.EmojiFlags);
reactionCmd.Set(":count", reaction.Count); reactionCmd.Set(":count", reaction.Count);
await reactionCmd.ExecuteNonQueryAsync(); await reactionCmd.ExecuteNonQueryAsync();
if (reaction.EmojiId is {} emojiId) {
await downloadCollector.Add(DownloadLinkExtractor.FromEmoji(emojiId, reaction.EmojiFlags));
}
} }
} }
} }
await tx.CommitAsync(); await tx.CommitAsync();
downloadCollector.OnCommitted();
} }
UpdateTotalCount(); UpdateTotalCount();
if (addedAttachments) {
attachments.UpdateTotalCount();
}
} }
public override Task<long> Count(CancellationToken cancellationToken) { public override Task<long> Count(CancellationToken cancellationToken) {
@ -180,7 +171,7 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
public async Task<long> Count(MessageFilter? filter, CancellationToken cancellationToken) { public async Task<long> Count(MessageFilter? filter, CancellationToken cancellationToken) {
await using var conn = await pool.Take(); await using var conn = await pool.Take();
return await conn.ExecuteReaderAsync("SELECT COUNT(*) FROM messages" + filter.GenerateConditions().BuildWhereClause(), static reader => reader?.GetInt64(0) ?? 0L, cancellationToken); return await conn.ExecuteReaderAsync("SELECT COUNT(*) FROM messages" + filter.GenerateWhereClause(), static reader => reader?.GetInt64(0) ?? 0L, cancellationToken);
} }
private sealed class MesageToManyCommand<T> : IAsyncDisposable { private sealed class MesageToManyCommand<T> : IAsyncDisposable {
@ -265,7 +256,7 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
FROM messages m FROM messages m
LEFT JOIN edit_timestamps et ON m.message_id = et.message_id LEFT JOIN edit_timestamps et ON m.message_id = et.message_id
LEFT JOIN replied_to rt ON m.message_id = rt.message_id LEFT JOIN replied_to rt ON m.message_id = rt.message_id
{filter.GenerateConditions("m").BuildWhereClause()} {filter.GenerateWhereClause("m")}
""" """
); );
@ -292,7 +283,7 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
public async IAsyncEnumerable<ulong> GetIds(MessageFilter? filter) { public async IAsyncEnumerable<ulong> GetIds(MessageFilter? filter) {
await using var conn = await pool.Take(); await using var conn = await pool.Take();
await using var cmd = conn.Command("SELECT message_id FROM messages" + filter.GenerateConditions().BuildWhereClause()); await using var cmd = conn.Command("SELECT message_id FROM messages" + filter.GenerateWhereClause());
await using var reader = await cmd.ExecuteReaderAsync(); await using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync()) { while (await reader.ReadAsync()) {
@ -306,7 +297,7 @@ sealed class SqliteMessageRepository : BaseSqliteRepository, IMessageRepository
$""" $"""
-- noinspection SqlWithoutWhere -- noinspection SqlWithoutWhere
DELETE FROM messages DELETE FROM messages
{filter.GenerateConditions(invert: mode == FilterRemovalMode.KeepMatching).BuildWhereClause()} {filter.GenerateWhereClause(invert: mode == FilterRemovalMode.KeepMatching)}
""" """
); );
} }

View File

@ -4,17 +4,14 @@ using System.Threading.Tasks;
using DHT.Server.Data; using DHT.Server.Data;
using DHT.Server.Database.Repositories; using DHT.Server.Database.Repositories;
using DHT.Server.Database.Sqlite.Utils; using DHT.Server.Database.Sqlite.Utils;
using DHT.Utils.Logging;
using Microsoft.Data.Sqlite; using Microsoft.Data.Sqlite;
namespace DHT.Server.Database.Sqlite.Repositories; namespace DHT.Server.Database.Sqlite.Repositories;
sealed class SqliteServerRepository : BaseSqliteRepository, IServerRepository { sealed class SqliteServerRepository : BaseSqliteRepository, IServerRepository {
private static readonly Log Log = Log.ForType<SqliteServerRepository>();
private readonly SqliteConnectionPool pool; private readonly SqliteConnectionPool pool;
public SqliteServerRepository(SqliteConnectionPool pool) : base(Log) { public SqliteServerRepository(SqliteConnectionPool pool) {
this.pool = pool; this.pool = pool;
} }

View File

@ -4,27 +4,21 @@ using System.Threading.Tasks;
using DHT.Server.Data; using DHT.Server.Data;
using DHT.Server.Database.Repositories; using DHT.Server.Database.Repositories;
using DHT.Server.Database.Sqlite.Utils; using DHT.Server.Database.Sqlite.Utils;
using DHT.Server.Download;
using DHT.Utils.Logging;
using Microsoft.Data.Sqlite; using Microsoft.Data.Sqlite;
namespace DHT.Server.Database.Sqlite.Repositories; namespace DHT.Server.Database.Sqlite.Repositories;
sealed class SqliteUserRepository : BaseSqliteRepository, IUserRepository { sealed class SqliteUserRepository : BaseSqliteRepository, IUserRepository {
private static readonly Log Log = Log.ForType<SqliteUserRepository>();
private readonly SqliteConnectionPool pool; private readonly SqliteConnectionPool pool;
private readonly SqliteDownloadRepository downloads;
public SqliteUserRepository(SqliteConnectionPool pool) {
public SqliteUserRepository(SqliteConnectionPool pool, SqliteDownloadRepository downloads) : base(Log) {
this.pool = pool; this.pool = pool;
this.downloads = downloads;
} }
public async Task Add(IReadOnlyList<User> users) { public async Task Add(IReadOnlyList<User> users) {
await using (var conn = await pool.Take()) { await using (var conn = await pool.Take()) {
await using var tx = await conn.BeginTransactionAsync(); await using var tx = await conn.BeginTransactionAsync();
await using var cmd = conn.Upsert("users", [ await using var cmd = conn.Upsert("users", [
("id", SqliteType.Integer), ("id", SqliteType.Integer),
("name", SqliteType.Text), ("name", SqliteType.Text),
@ -32,22 +26,15 @@ sealed class SqliteUserRepository : BaseSqliteRepository, IUserRepository {
("discriminator", SqliteType.Text) ("discriminator", SqliteType.Text)
]); ]);
await using var downloadCollector = new SqliteDownloadRepository.NewDownloadCollector(downloads, conn);
foreach (var user in users) { foreach (var user in users) {
cmd.Set(":id", user.Id); cmd.Set(":id", user.Id);
cmd.Set(":name", user.Name); cmd.Set(":name", user.Name);
cmd.Set(":avatar_url", user.AvatarUrl); cmd.Set(":avatar_url", user.AvatarUrl);
cmd.Set(":discriminator", user.Discriminator); cmd.Set(":discriminator", user.Discriminator);
await cmd.ExecuteNonQueryAsync(); await cmd.ExecuteNonQueryAsync();
if (user.AvatarUrl is {} avatarUrl) {
await downloadCollector.Add(DownloadLinkExtractor.FromUserAvatar(user.Id, avatarUrl));
}
} }
await tx.CommitAsync(); await tx.CommitAsync();
downloadCollector.OnCommitted();
} }
UpdateTotalCount(); UpdateTotalCount();
@ -60,7 +47,7 @@ sealed class SqliteUserRepository : BaseSqliteRepository, IUserRepository {
public async IAsyncEnumerable<User> Get() { public async IAsyncEnumerable<User> Get() {
await using var conn = await pool.Take(); await using var conn = await pool.Take();
await using var cmd = conn.Command("SELECT id, name, avatar_url, discriminator FROM users"); await using var cmd = conn.Command("SELECT id, name, avatar_url, discriminator FROM users");
await using var reader = await cmd.ExecuteReaderAsync(); await using var reader = await cmd.ExecuteReaderAsync();

View File

@ -1,155 +0,0 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using DHT.Server.Data;
using DHT.Server.Database.Sqlite.Utils;
using DHT.Server.Download;
using Microsoft.Data.Sqlite;
namespace DHT.Server.Database.Sqlite.Schema;
sealed class SqliteSchemaUpgradeTo7 : ISchemaUpgrade {
async Task ISchemaUpgrade.Run(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter) {
await reporter.MainWork("Applying schema changes...", 0, 6);
await SqliteSchema.CreateDownloadTables(conn);
await reporter.MainWork("Migrating download metadata...", 1, 6);
await conn.ExecuteAsync("INSERT INTO download_metadata (normalized_url, download_url, status, size) SELECT normalized_url, download_url, status, size FROM downloads");
await reporter.MainWork("Merging attachment metadata...", 2, 6);
await conn.ExecuteAsync("UPDATE download_metadata SET type = (SELECT type FROM attachments WHERE download_metadata.normalized_url = attachments.normalized_url)");
await reporter.MainWork("Migrating downloaded files...", 3, 6);
await MigrateDownloadBlobsToNewTable(conn, reporter);
await reporter.MainWork("Applying schema changes...", 4, 6);
await conn.ExecuteAsync("DROP TABLE downloads");
await reporter.MainWork("Discovering downloadable links...", 5, 6);
await DiscoverDownloadableLinks(conn, reporter);
}
private async Task MigrateDownloadBlobsToNewTable(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter) {
await reporter.SubWork("Listing downloaded files...", 0, 0);
var urlsToMigrate = await GetDownloadedFileUrls(conn);
int totalFiles = urlsToMigrate.Count;
int processedFiles = -1;
await reporter.SubWork("Processing downloaded files...", 0, totalFiles);
var tx = await conn.BeginTransactionAsync();
await using (var insertCmd = conn.Command("INSERT INTO download_blobs (normalized_url, blob) SELECT normalized_url, blob FROM downloads WHERE normalized_url = :normalized_url"))
await using (var deleteCmd = conn.Command("DELETE FROM downloads WHERE normalized_url = :normalized_url")) {
insertCmd.Add(":normalized_url", SqliteType.Text);
deleteCmd.Add(":normalized_url", SqliteType.Text);
foreach (var url in urlsToMigrate) {
if (++processedFiles % 10 == 0) {
await reporter.SubWork("Processing downloaded files...", processedFiles, totalFiles);
// Not proper way of dealing with transactions, but it avoids a long commit at the end.
// Schema upgrades are already non-atomic anyways, so this doesn't make it worse.
await tx.CommitAsync();
await tx.DisposeAsync();
tx = await conn.BeginTransactionAsync();
insertCmd.Transaction = (SqliteTransaction) tx;
deleteCmd.Transaction = (SqliteTransaction) tx;
}
insertCmd.Set(":normalized_url", url);
await insertCmd.ExecuteNonQueryAsync();
deleteCmd.Set(":normalized_url", url);
await deleteCmd.ExecuteNonQueryAsync();
}
}
await reporter.SubWork("Processing downloaded files...", totalFiles, totalFiles);
await tx.CommitAsync();
await tx.DisposeAsync();
}
private async Task<List<string>> GetDownloadedFileUrls(ISqliteConnection conn) {
var urls = new List<string>();
await using var selectCmd = conn.Command("SELECT normalized_url FROM downloads WHERE blob IS NOT NULL");
await using var reader = await selectCmd.ExecuteReaderAsync();
while (await reader.ReadAsync()) {
urls.Add(reader.GetString(0));
}
return urls;
}
private async Task DiscoverDownloadableLinks(ISqliteConnection conn, ISchemaUpgradeCallbacks.IProgressReporter reporter) {
await reporter.SubWork("Processing attachments...", 0, 4);
await using (var cmd = conn.Command("""
INSERT OR IGNORE INTO download_metadata (normalized_url, download_url, status, type, size)
SELECT a.normalized_url, a.download_url, :pending, a.type, MAX(a.size)
FROM attachments a
GROUP BY a.normalized_url
""")) {
cmd.AddAndSet(":pending", SqliteType.Integer, (int) DownloadStatus.Pending);
await cmd.ExecuteNonQueryAsync();
}
static async Task InsertDownload(SqliteCommand insertCmd, Data.Download? download) {
if (download == null) {
return;
}
insertCmd.Set(":normalized_url", download.NormalizedUrl);
insertCmd.Set(":download_url", download.DownloadUrl);
insertCmd.Set(":status", (int) download.Status);
insertCmd.Set(":type", download.Type);
insertCmd.Set(":size", download.Size);
await insertCmd.ExecuteNonQueryAsync();
}
await using (var tx = await conn.BeginTransactionAsync()) {
await using var insertCmd = conn.Command("INSERT OR IGNORE INTO download_metadata (normalized_url, download_url, status, type, size) VALUES (:normalized_url, :download_url, :status, :type, :size)");
insertCmd.Add(":normalized_url", SqliteType.Text);
insertCmd.Add(":download_url", SqliteType.Text);
insertCmd.Add(":status", SqliteType.Integer);
insertCmd.Add(":type", SqliteType.Text);
insertCmd.Add(":size", SqliteType.Integer);
await reporter.SubWork("Processing embeds...", 1, 4);
await using (var embedCmd = conn.Command("SELECT json FROM embeds")) {
await using var reader = await embedCmd.ExecuteReaderAsync();
while (await reader.ReadAsync()) {
await InsertDownload(insertCmd, await DownloadLinkExtractor.TryFromEmbedJson(reader.GetStream(0)));
}
}
await reporter.SubWork("Processing users...", 2, 4);
await using (var avatarCmd = conn.Command("SELECT id, avatar_url FROM users WHERE avatar_url IS NOT NULL")) {
await using var reader = await avatarCmd.ExecuteReaderAsync();
while (await reader.ReadAsync()) {
await InsertDownload(insertCmd, DownloadLinkExtractor.FromUserAvatar(reader.GetUint64(0), reader.GetString(1)));
}
}
await reporter.SubWork("Processing reactions...", 3, 4);
await using (var avatarCmd = conn.Command("SELECT DISTINCT emoji_id, emoji_flags FROM reactions WHERE emoji_id IS NOT NULL")) {
await using var reader = await avatarCmd.ExecuteReaderAsync();
while (await reader.ReadAsync()) {
await InsertDownload(insertCmd, DownloadLinkExtractor.FromEmoji(reader.GetUint64(0), (EmojiFlags) reader.GetInt16(1)));
}
}
await tx.CommitAsync();
}
}
}

View File

@ -43,6 +43,7 @@ public sealed class SqliteDatabaseFile : IDatabaseFile {
public IServerRepository Servers => servers; public IServerRepository Servers => servers;
public IChannelRepository Channels => channels; public IChannelRepository Channels => channels;
public IMessageRepository Messages => messages; public IMessageRepository Messages => messages;
public IAttachmentRepository Attachments => attachments;
public IDownloadRepository Downloads => downloads; public IDownloadRepository Downloads => downloads;
private readonly SqliteConnectionPool pool; private readonly SqliteConnectionPool pool;
@ -51,17 +52,18 @@ public sealed class SqliteDatabaseFile : IDatabaseFile {
private readonly SqliteServerRepository servers; private readonly SqliteServerRepository servers;
private readonly SqliteChannelRepository channels; private readonly SqliteChannelRepository channels;
private readonly SqliteMessageRepository messages; private readonly SqliteMessageRepository messages;
private readonly SqliteAttachmentRepository attachments;
private readonly SqliteDownloadRepository downloads; private readonly SqliteDownloadRepository downloads;
private SqliteDatabaseFile(string path, SqliteConnectionPool pool) { private SqliteDatabaseFile(string path, SqliteConnectionPool pool) {
this.Path = path; this.Path = path;
this.pool = pool; this.pool = pool;
downloads = new SqliteDownloadRepository(pool); users = new SqliteUserRepository(pool);
users = new SqliteUserRepository(pool, downloads);
servers = new SqliteServerRepository(pool); servers = new SqliteServerRepository(pool);
channels = new SqliteChannelRepository(pool); channels = new SqliteChannelRepository(pool);
messages = new SqliteMessageRepository(pool, downloads); messages = new SqliteMessageRepository(pool, attachments = new SqliteAttachmentRepository(pool));
downloads = new SqliteDownloadRepository(pool);
} }
public async ValueTask DisposeAsync() { public async ValueTask DisposeAsync() {
@ -69,6 +71,7 @@ public sealed class SqliteDatabaseFile : IDatabaseFile {
servers.Dispose(); servers.Dispose();
channels.Dispose(); channels.Dispose();
messages.Dispose(); messages.Dispose();
attachments.Dispose();
downloads.Dispose(); downloads.Dispose();
await pool.DisposeAsync(); await pool.DisposeAsync();
} }

View File

@ -8,53 +8,77 @@ using DHT.Server.Database.Sqlite.Utils;
namespace DHT.Server.Database.Sqlite; namespace DHT.Server.Database.Sqlite;
static class SqliteFilters { static class SqliteFilters {
public static SqliteConditionBuilder GenerateConditions(this MessageFilter? filter, string? tableAlias = null, bool invert = false) { private static string WhereAll(bool invert) {
var builder = new SqliteConditionBuilder(tableAlias, invert); return invert ? "WHERE FALSE" : "";
if (filter != null) {
if (filter.StartDate != null) {
builder.AddCondition("timestamp >= " + new DateTimeOffset(filter.StartDate.Value).ToUnixTimeMilliseconds());
}
if (filter.EndDate != null) {
builder.AddCondition("timestamp <= " + new DateTimeOffset(filter.EndDate.Value).ToUnixTimeMilliseconds());
}
if (filter.ChannelIds != null) {
builder.AddCondition("channel_id IN (" + string.Join(",", filter.ChannelIds) + ")");
}
if (filter.UserIds != null) {
builder.AddCondition("sender_id IN (" + string.Join(",", filter.UserIds) + ")");
}
if (filter.MessageIds != null) {
builder.AddCondition("message_id IN (" + string.Join(",", filter.MessageIds) + ")");
}
}
return builder;
} }
public static SqliteConditionBuilder GenerateConditions(this DownloadItemFilter? filter, string? tableAlias = null, bool invert = false) { public static string GenerateWhereClause(this MessageFilter? filter, string? tableAlias = null, bool invert = false) {
var builder = new SqliteConditionBuilder(tableAlias, invert); if (filter == null || filter.IsEmpty) {
return WhereAll(invert);
if (filter != null) {
if (filter.IncludeStatuses != null) {
builder.AddCondition("status IN (" + filter.IncludeStatuses.In() + ")");
}
if (filter.ExcludeStatuses != null) {
builder.AddCondition("status NOT IN (" + filter.ExcludeStatuses.In() + ")");
}
if (filter.MaxBytes != null) {
builder.AddCondition("size IS NOT NULL");
builder.AddCondition("size <= " + filter.MaxBytes);
}
} }
return builder; var where = new SqliteWhereGenerator(tableAlias, invert);
if (filter.StartDate != null) {
where.AddCondition("timestamp >= " + new DateTimeOffset(filter.StartDate.Value).ToUnixTimeMilliseconds());
}
if (filter.EndDate != null) {
where.AddCondition("timestamp <= " + new DateTimeOffset(filter.EndDate.Value).ToUnixTimeMilliseconds());
}
if (filter.ChannelIds != null) {
where.AddCondition("channel_id IN (" + string.Join(",", filter.ChannelIds) + ")");
}
if (filter.UserIds != null) {
where.AddCondition("sender_id IN (" + string.Join(",", filter.UserIds) + ")");
}
if (filter.MessageIds != null) {
where.AddCondition("message_id IN (" + string.Join(",", filter.MessageIds) + ")");
}
return where.Generate();
}
public static string GenerateWhereClause(this AttachmentFilter? filter, string? tableAlias = null, bool invert = false) {
if (filter == null || filter.IsEmpty) {
return WhereAll(invert);
}
var where = new SqliteWhereGenerator(tableAlias, invert);
if (filter.MaxBytes != null) {
where.AddCondition("size <= " + filter.MaxBytes);
}
if (filter.DownloadItemRule == AttachmentFilter.DownloadItemRules.OnlyNotPresent) {
where.AddCondition("normalized_url NOT IN (SELECT normalized_url FROM downloads)");
}
else if (filter.DownloadItemRule == AttachmentFilter.DownloadItemRules.OnlyPresent) {
where.AddCondition("normalized_url IN (SELECT normalized_url FROM downloads)");
}
return where.Generate();
}
public static string GenerateWhereClause(this DownloadItemFilter? filter, string? tableAlias = null, bool invert = false) {
if (filter == null || filter.IsEmpty) {
return WhereAll(invert);
}
var where = new SqliteWhereGenerator(tableAlias, invert);
if (filter.IncludeStatuses != null) {
where.AddCondition("status IN (" + filter.IncludeStatuses.In() + ")");
}
if (filter.ExcludeStatuses != null) {
where.AddCondition("status NOT IN (" + filter.ExcludeStatuses.In() + ")");
}
return where.Generate();
} }
private static string In(this ISet<DownloadStatus> statuses) { private static string In(this ISet<DownloadStatus> statuses) {

View File

@ -8,7 +8,7 @@ using DHT.Utils.Logging;
namespace DHT.Server.Database.Sqlite; namespace DHT.Server.Database.Sqlite;
sealed class SqliteSchema { sealed class SqliteSchema {
internal const int Version = 7; internal const int Version = 6;
private static readonly Log Log = Log.ForType<SqliteSchema>(); private static readonly Log Log = Log.ForType<SqliteSchema>();
@ -116,7 +116,26 @@ sealed class SqliteSchema {
await CreateMessageEditTimestampTable(conn); await CreateMessageEditTimestampTable(conn);
await CreateMessageRepliedToTable(conn); await CreateMessageRepliedToTable(conn);
await CreateDownloadTables(conn);
await conn.ExecuteAsync("""
CREATE TABLE downloads (
normalized_url TEXT NOT NULL PRIMARY KEY,
download_url TEXT,
status INTEGER NOT NULL,
size INTEGER NOT NULL,
blob BLOB
)
""");
await conn.ExecuteAsync("""
CREATE TABLE reactions (
message_id INTEGER NOT NULL,
emoji_id INTEGER,
emoji_name TEXT,
emoji_flags INTEGER NOT NULL,
count INTEGER NOT NULL
)
""");
await conn.ExecuteAsync("CREATE INDEX attachments_message_ix ON attachments(message_id)"); await conn.ExecuteAsync("CREATE INDEX attachments_message_ix ON attachments(message_id)");
await conn.ExecuteAsync("CREATE INDEX embeds_message_ix ON embeds(message_id)"); await conn.ExecuteAsync("CREATE INDEX embeds_message_ix ON embeds(message_id)");
@ -143,26 +162,6 @@ sealed class SqliteSchema {
"""); """);
} }
internal static async Task CreateDownloadTables(ISqliteConnection conn) {
await conn.ExecuteAsync("""
CREATE TABLE download_metadata (
normalized_url TEXT NOT NULL PRIMARY KEY,
download_url TEXT NOT NULL,
status INTEGER NOT NULL,
type TEXT,
size INTEGER
)
""");
await conn.ExecuteAsync("""
CREATE TABLE download_blobs (
normalized_url TEXT NOT NULL PRIMARY KEY,
blob BLOB NOT NULL,
FOREIGN KEY (normalized_url) REFERENCES download_metadata (normalized_url) ON UPDATE CASCADE ON DELETE CASCADE
)
""");
}
private async Task UpgradeSchemas(int dbVersion, ISchemaUpgradeCallbacks.IProgressReporter reporter) { private async Task UpgradeSchemas(int dbVersion, ISchemaUpgradeCallbacks.IProgressReporter reporter) {
var upgrades = new Dictionary<int, ISchemaUpgrade> { var upgrades = new Dictionary<int, ISchemaUpgrade> {
{ 1, new SqliteSchemaUpgradeTo2() }, { 1, new SqliteSchemaUpgradeTo2() },
@ -170,7 +169,6 @@ sealed class SqliteSchema {
{ 3, new SqliteSchemaUpgradeTo4() }, { 3, new SqliteSchemaUpgradeTo4() },
{ 4, new SqliteSchemaUpgradeTo5() }, { 4, new SqliteSchemaUpgradeTo5() },
{ 5, new SqliteSchemaUpgradeTo6() }, { 5, new SqliteSchemaUpgradeTo6() },
{ 6, new SqliteSchemaUpgradeTo7() },
}; };
var perf = Log.Start("from version " + dbVersion); var perf = Log.Start("from version " + dbVersion);

View File

@ -42,8 +42,9 @@ sealed class SqliteConnectionPool : IAsyncDisposable {
var pooledConnection = new PooledConnection(this, conn); var pooledConnection = new PooledConnection(this, conn);
await pooledConnection.ExecuteAsync("PRAGMA journal_mode=WAL", disposalToken); await using (var cmd = pooledConnection.Command("PRAGMA journal_mode=WAL")) {
await pooledConnection.ExecuteAsync("PRAGMA foreign_keys=ON", disposalToken); await cmd.ExecuteNonQueryAsync(disposalToken);
}
all.Add(pooledConnection); all.Add(pooledConnection);
await free.Push(pooledConnection, disposalToken); await free.Push(pooledConnection, disposalToken);

View File

@ -13,7 +13,7 @@ static class SqliteExtensions {
return conn.InnerConnection.BeginTransactionAsync(); return conn.InnerConnection.BeginTransactionAsync();
} }
public static SqliteCommand Command(this ISqliteConnection conn, [LanguageInjection("sql")] string sql) { public static SqliteCommand Command(this ISqliteConnection conn, string sql) {
var cmd = conn.InnerConnection.CreateCommand(); var cmd = conn.InnerConnection.CreateCommand();
cmd.CommandText = sql; cmd.CommandText = sql;
return cmd; return cmd;

View File

@ -2,12 +2,12 @@ using System.Collections.Generic;
namespace DHT.Server.Database.Sqlite.Utils; namespace DHT.Server.Database.Sqlite.Utils;
sealed class SqliteConditionBuilder { sealed class SqliteWhereGenerator {
private readonly string? tableAlias; private readonly string? tableAlias;
private readonly bool invert; private readonly bool invert;
private readonly List<string> conditions = []; private readonly List<string> conditions = [];
public SqliteConditionBuilder(string? tableAlias, bool invert) { public SqliteWhereGenerator(string? tableAlias, bool invert) {
this.tableAlias = tableAlias; this.tableAlias = tableAlias;
this.invert = invert; this.invert = invert;
} }
@ -16,20 +16,16 @@ sealed class SqliteConditionBuilder {
conditions.Add(tableAlias == null ? condition : tableAlias + '.' + condition); conditions.Add(tableAlias == null ? condition : tableAlias + '.' + condition);
} }
public string Build() { public string Generate() {
if (conditions.Count == 0) { if (conditions.Count == 0) {
return invert ? "FALSE" : "TRUE"; return "";
} }
if (invert) { if (invert) {
return "NOT (" + string.Join(" AND ", conditions) + ")"; return " WHERE NOT (" + string.Join(" AND ", conditions) + ")";
} }
else { else {
return string.Join(" AND ", conditions); return " WHERE " + string.Join(" AND ", conditions);
} }
} }
public string BuildWhereClause() {
return " WHERE " + Build();
}
} }

View File

@ -1,7 +1,5 @@
using System; using System;
using System.Collections.Frozen; using System.Collections.Frozen;
using System.Diagnostics.CodeAnalysis;
using System.Web;
namespace DHT.Server.Download; namespace DHT.Server.Download;
@ -9,35 +7,9 @@ static class DiscordCdn {
private static FrozenSet<string> CdnHosts { get; } = new[] { private static FrozenSet<string> CdnHosts { get; } = new[] {
"cdn.discordapp.com", "cdn.discordapp.com",
"cdn.discord.com", "cdn.discord.com",
"media.discordapp.net"
}.ToFrozenSet(); }.ToFrozenSet();
private static bool IsCdnUrl(string originalUrl, [NotNullWhen(true)] out Uri? uri) {
return Uri.TryCreate(originalUrl, UriKind.Absolute, out uri) && CdnHosts.Contains(uri.Host);
}
public static string NormalizeUrl(string originalUrl) { public static string NormalizeUrl(string originalUrl) {
return IsCdnUrl(originalUrl, out var uri) ? DoNormalize(uri) : originalUrl; return Uri.TryCreate(originalUrl, UriKind.Absolute, out var uri) && CdnHosts.Contains(uri.Host) ? uri.GetLeftPart(UriPartial.Path) : originalUrl;
}
public static bool NormalizeUrlAndReturnIfCdn(string originalUrl, out string normalizedUrl) {
if (IsCdnUrl(originalUrl, out var uri)) {
normalizedUrl = DoNormalize(uri);
return true;
}
else {
normalizedUrl = originalUrl;
return false;
}
}
private static string DoNormalize(Uri uri) {
var query = HttpUtility.ParseQueryString(uri.Query);
query.Remove("ex");
query.Remove("is");
query.Remove("hm");
return new UriBuilder(uri) { Query = query.ToString() }.Uri.ToString();
} }
} }

View File

@ -1,22 +1,7 @@
using System;
using System.Net;
using DHT.Server.Data;
namespace DHT.Server.Download; namespace DHT.Server.Download;
public readonly struct DownloadItem { public readonly struct DownloadItem {
public string NormalizedUrl { get; init; } public string NormalizedUrl { get; init; }
public string DownloadUrl { get; init; } public string DownloadUrl { get; init; }
public string? Type { get; init; } public ulong Size { get; init; }
public ulong? Size { get; init; }
internal DownloadWithData ToSuccess(byte[] data) {
var size = (ulong) Math.Max(data.LongLength, 0);
return new DownloadWithData(new Data.Download(NormalizedUrl, DownloadUrl, DownloadStatus.Success, Type, size), data);
}
internal DownloadWithData ToFailure(HttpStatusCode? statusCode = null) {
var status = statusCode.HasValue ? (DownloadStatus) (int) statusCode : DownloadStatus.GenericError;
return new DownloadWithData(new Data.Download(NormalizedUrl, DownloadUrl, status, Type, Size), Data: null);
}
} }

View File

@ -1,122 +0,0 @@
using System;
using System.IO;
using System.Net.Mime;
using System.Text.Json;
using System.Threading.Tasks;
using DHT.Server.Data;
using DHT.Server.Data.Embeds;
using DHT.Utils.Logging;
namespace DHT.Server.Download;
static class DownloadLinkExtractor {
private static readonly Log Log = Log.ForType(typeof(DownloadLinkExtractor));
public static Data.Download FromUserAvatar(ulong userId, string avatarPath) {
string url = $"https://cdn.discordapp.com/avatars/{userId}/{avatarPath}.webp";
return new Data.Download(url, url, DownloadStatus.Pending, MediaTypeNames.Image.Webp, size: null);
}
public static Data.Download FromEmoji(ulong emojiId, EmojiFlags flags) {
var isAnimated = flags.HasFlag(EmojiFlags.Animated);
string ext = isAnimated ? "gif" : "webp";
string type = isAnimated ? MediaTypeNames.Image.Gif : MediaTypeNames.Image.Webp;
string url = $"https://cdn.discordapp.com/emojis/{emojiId}.{ext}";
return new Data.Download(url, url, DownloadStatus.Pending, type, size: null);
}
public static Data.Download FromAttachment(Attachment attachment) {
return new Data.Download(attachment.NormalizedUrl, attachment.DownloadUrl, DownloadStatus.Pending, attachment.Type, attachment.Size);
}
public static async Task<Data.Download?> TryFromEmbedJson(Stream jsonStream) {
try {
return FromEmbed(await JsonSerializer.DeserializeAsync(jsonStream, DiscordEmbedJsonContext.Default.DiscordEmbedJson));
} catch (Exception e) {
Log.Error("Could not parse embed json: " + e);
return null;
}
}
public static Data.Download? TryFromEmbedJson(string json) {
try {
return FromEmbed(JsonSerializer.Deserialize(json, DiscordEmbedJsonContext.Default.DiscordEmbedJson));
} catch (Exception e) {
Log.Error("Could not parse embed json: " + e);
return null;
}
}
private static Data.Download? FromEmbed(DiscordEmbedJson? embed) {
if (embed is { Type: "image", Image.Url: {} imageUrl }) {
return FromEmbedImage(imageUrl);
}
else if (embed is { Type: "video", Video.Url: {} videoUrl }) {
return FromEmbedVideo(videoUrl);
}
else {
return null;
}
}
private static Data.Download? FromEmbedImage(string url) {
if (DiscordCdn.NormalizeUrlAndReturnIfCdn(url, out var normalizedUrl)) {
return new Data.Download(normalizedUrl, url, DownloadStatus.Pending, GuessImageType(normalizedUrl), size: null);
}
else {
Log.Debug("Skipping non-CDN image url: " + url);
return null;
}
}
private static Data.Download? FromEmbedVideo(string url) {
if (DiscordCdn.NormalizeUrlAndReturnIfCdn(url, out var normalizedUrl)) {
return new Data.Download(normalizedUrl, url, DownloadStatus.Pending, GuessVideoType(normalizedUrl), size: null);
}
else {
Log.Debug("Skipping non-CDN video url: " + url);
return null;
}
}
private static string? GuessImageType(string url) {
if (!Uri.TryCreate(url, UriKind.Absolute, out var uri)) {
return null;
}
ReadOnlySpan<char> extension = Path.GetExtension(uri.AbsolutePath).ToLowerInvariant();
// Remove Twitter quality suffix.
int colonIndex = extension.IndexOf(':');
if (colonIndex != -1) {
extension = extension[..colonIndex];
}
return extension switch {
".jpg" => MediaTypeNames.Image.Jpeg,
".jpeg" => MediaTypeNames.Image.Jpeg,
".png" => MediaTypeNames.Image.Png,
".gif" => MediaTypeNames.Image.Gif,
".webp" => MediaTypeNames.Image.Webp,
".bmp" => MediaTypeNames.Image.Bmp,
_ => null
};
}
private static string? GuessVideoType(string url) {
if (!Uri.TryCreate(url, UriKind.Absolute, out var uri)) {
return null;
}
string extension = Path.GetExtension(uri.AbsolutePath).ToLowerInvariant();
return extension switch {
".mp4" => "video/mp4",
".mpeg" => "video/mpeg",
".webm" => "video/webm",
".mov" => "video/quicktime",
_ => null
};
}
}

View File

@ -1,7 +1,6 @@
using System; using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DHT.Server.Data.Filters;
using DHT.Server.Database; using DHT.Server.Database;
namespace DHT.Server.Download; namespace DHT.Server.Download;
@ -17,10 +16,10 @@ public sealed class Downloader {
this.db = db; this.db = db;
} }
public async Task<IObservable<DownloadItem>> Start(DownloadItemFilter filter) { public async Task<IObservable<DownloadItem>> Start() {
await semaphore.WaitAsync(); await semaphore.WaitAsync();
try { try {
current ??= new DownloaderTask(db, filter); current ??= new DownloaderTask(db);
return current.FinishedItems; return current.FinishedItems;
} finally { } finally {
semaphore.Release(); semaphore.Release();

View File

@ -5,7 +5,6 @@ using System.Reactive.Subjects;
using System.Threading; using System.Threading;
using System.Threading.Channels; using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using DHT.Server.Data.Filters;
using DHT.Server.Database; using DHT.Server.Database;
using DHT.Utils.Logging; using DHT.Utils.Logging;
using DHT.Utils.Tasks; using DHT.Utils.Tasks;
@ -30,7 +29,6 @@ sealed class DownloaderTask : IAsyncDisposable {
private readonly CancellationToken cancellationToken; private readonly CancellationToken cancellationToken;
private readonly IDatabaseFile db; private readonly IDatabaseFile db;
private readonly DownloadItemFilter filter;
private readonly ISubject<DownloadItem> finishedItemPublisher = Subject.Synchronize(new Subject<DownloadItem>()); private readonly ISubject<DownloadItem> finishedItemPublisher = Subject.Synchronize(new Subject<DownloadItem>());
private readonly Task queueWriterTask; private readonly Task queueWriterTask;
@ -38,9 +36,8 @@ sealed class DownloaderTask : IAsyncDisposable {
public IObservable<DownloadItem> FinishedItems => finishedItemPublisher; public IObservable<DownloadItem> FinishedItems => finishedItemPublisher;
internal DownloaderTask(IDatabaseFile db, DownloadItemFilter filter) { internal DownloaderTask(IDatabaseFile db) {
this.db = db; this.db = db;
this.filter = filter;
this.cancellationToken = cancellationTokenSource.Token; this.cancellationToken = cancellationTokenSource.Token;
this.queueWriterTask = Task.Run(RunQueueWriterTask); this.queueWriterTask = Task.Run(RunQueueWriterTask);
this.downloadTasks = Enumerable.Range(1, DownloadTasks).Select(taskIndex => Task.Run(() => RunDownloadTask(taskIndex))).ToArray(); this.downloadTasks = Enumerable.Range(1, DownloadTasks).Select(taskIndex => Task.Run(() => RunDownloadTask(taskIndex))).ToArray();
@ -48,7 +45,7 @@ sealed class DownloaderTask : IAsyncDisposable {
private async Task RunQueueWriterTask() { private async Task RunQueueWriterTask() {
while (await downloadQueue.Writer.WaitToWriteAsync(cancellationToken)) { while (await downloadQueue.Writer.WaitToWriteAsync(cancellationToken)) {
var newItems = await db.Downloads.PullPendingDownloadItems(QueueSize, filter, cancellationToken).ToListAsync(cancellationToken); var newItems = await db.Downloads.PullEnqueuedDownloadItems(QueueSize, cancellationToken).ToListAsync(cancellationToken);
if (newItems.Count == 0) { if (newItems.Count == 0) {
await Task.Delay(TimeSpan.FromMilliseconds(50), cancellationToken); await Task.Delay(TimeSpan.FromMilliseconds(50), cancellationToken);
continue; continue;
@ -73,14 +70,14 @@ sealed class DownloaderTask : IAsyncDisposable {
try { try {
var downloadedBytes = await client.GetByteArrayAsync(item.DownloadUrl, cancellationToken); var downloadedBytes = await client.GetByteArrayAsync(item.DownloadUrl, cancellationToken);
await db.Downloads.AddDownload(item.ToSuccess(downloadedBytes)); await db.Downloads.AddDownload(Data.Download.NewSuccess(item, downloadedBytes));
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
// Ignore. // Ignore.
} catch (HttpRequestException e) { } catch (HttpRequestException e) {
await db.Downloads.AddDownload(item.ToFailure(e.StatusCode)); await db.Downloads.AddDownload(Data.Download.NewFailure(item, e.StatusCode, item.Size));
log.Error(e); log.Error(e);
} catch (Exception e) { } catch (Exception e) {
await db.Downloads.AddDownload(item.ToFailure()); await db.Downloads.AddDownload(Data.Download.NewFailure(item, null, item.Size));
log.Error(e); log.Error(e);
} finally { } finally {
try { try {

View File

@ -0,0 +1,24 @@
using System.Net;
using System.Threading.Tasks;
using DHT.Server.Data;
using DHT.Server.Database;
using DHT.Utils.Http;
using Microsoft.AspNetCore.Http;
namespace DHT.Server.Endpoints;
sealed class GetAttachmentEndpoint : BaseEndpoint {
public GetAttachmentEndpoint(IDatabaseFile db) : base(db) {}
protected override async Task<IHttpOutput> Respond(HttpContext ctx) {
string attachmentUrl = WebUtility.UrlDecode((string) ctx.Request.RouteValues["url"]!);
DownloadedAttachment? maybeDownloadedAttachment = await Db.Downloads.GetDownloadedAttachment(attachmentUrl);
if (maybeDownloadedAttachment is {} downloadedAttachment) {
return new HttpOutput.File(downloadedAttachment.Type, downloadedAttachment.Data);
}
else {
return new HttpOutput.Redirect(attachmentUrl, permanent: false);
}
}
}

View File

@ -1,24 +0,0 @@
using System.Net;
using System.Threading.Tasks;
using DHT.Server.Data;
using DHT.Server.Database;
using DHT.Utils.Http;
using Microsoft.AspNetCore.Http;
namespace DHT.Server.Endpoints;
sealed class GetDownloadedFileEndpoint : BaseEndpoint {
public GetDownloadedFileEndpoint(IDatabaseFile db) : base(db) {}
protected override async Task<IHttpOutput> Respond(HttpContext ctx) {
string normalizedUrl = WebUtility.UrlDecode((string) ctx.Request.RouteValues["url"]!);
DownloadWithData? maybeDownloadWithData = await Db.Downloads.GetSuccessfulDownloadWithData(normalizedUrl);
if (maybeDownloadWithData is { Download: {} download, Data: {} data }) {
return new HttpOutput.File(download.Type, data);
}
else {
return new HttpOutput.Redirect(normalizedUrl, permanent: false);
}
}
}

View File

@ -41,7 +41,7 @@ sealed class Startup {
app.UseEndpoints(endpoints => { app.UseEndpoints(endpoints => {
endpoints.MapGet("/get-tracking-script", new GetTrackingScriptEndpoint(db, parameters).Handle); endpoints.MapGet("/get-tracking-script", new GetTrackingScriptEndpoint(db, parameters).Handle);
endpoints.MapGet("/get-downloaded-file/{url}", new GetDownloadedFileEndpoint(db).Handle); endpoints.MapGet("/get-attachment/{url}", new GetAttachmentEndpoint(db).Handle);
endpoints.MapPost("/track-channel", new TrackChannelEndpoint(db).Handle); endpoints.MapPost("/track-channel", new TrackChannelEndpoint(db).Handle);
endpoints.MapPost("/track-users", new TrackUsersEndpoint(db).Handle); endpoints.MapPost("/track-users", new TrackUsersEndpoint(db).Handle);
endpoints.MapPost("/track-messages", new TrackMessagesEndpoint(db).Handle); endpoints.MapPost("/track-messages", new TrackMessagesEndpoint(db).Handle);

View File

@ -2,7 +2,6 @@ using System;
using System.Reactive.Subjects; using System.Reactive.Subjects;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DHT.Utils.Logging;
namespace DHT.Utils.Tasks; namespace DHT.Utils.Tasks;
@ -10,9 +9,9 @@ public sealed class ObservableThrottledTask<T> : IObservable<T>, IDisposable {
private readonly ReplaySubject<T> subject; private readonly ReplaySubject<T> subject;
private readonly ThrottledTask<T> task; private readonly ThrottledTask<T> task;
public ObservableThrottledTask(Log log, TaskScheduler resultScheduler) { public ObservableThrottledTask(TaskScheduler resultScheduler) {
this.subject = new ReplaySubject<T>(bufferSize: 1); this.subject = new ReplaySubject<T>(bufferSize: 1);
this.task = new ThrottledTask<T>(log, subject.OnNext, resultScheduler); this.task = new ThrottledTask<T>(subject.OnNext, resultScheduler);
} }
public void Post(Func<CancellationToken, Task<T>> resultComputer) { public void Post(Func<CancellationToken, Task<T>> resultComputer) {

View File

@ -2,7 +2,6 @@ using System;
using System.Threading; using System.Threading;
using System.Threading.Channels; using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using DHT.Utils.Logging;
namespace DHT.Utils.Tasks; namespace DHT.Utils.Tasks;
@ -15,11 +14,8 @@ public abstract class ThrottledTaskBase<T> : IDisposable {
}); });
private readonly CancellationTokenSource cancellationTokenSource = new (); private readonly CancellationTokenSource cancellationTokenSource = new ();
private readonly Log log;
internal ThrottledTaskBase(Log log) { internal ThrottledTaskBase() {}
this.log = log;
}
protected async Task ReaderTask() { protected async Task ReaderTask() {
var cancellationToken = cancellationTokenSource.Token; var cancellationToken = cancellationTokenSource.Token;
@ -30,8 +26,8 @@ public abstract class ThrottledTaskBase<T> : IDisposable {
await Run(item, cancellationToken); await Run(item, cancellationToken);
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
throw; throw;
} catch (Exception e) { } catch (Exception) {
log.Error("Caught exception in task: " + e); // Ignore.
} }
} }
} catch (OperationCanceledException) { } catch (OperationCanceledException) {
@ -57,7 +53,7 @@ public sealed class ThrottledTask : ThrottledTaskBase<Task> {
private readonly Action resultProcessor; private readonly Action resultProcessor;
private readonly TaskScheduler resultScheduler; private readonly TaskScheduler resultScheduler;
public ThrottledTask(Log log, Action resultProcessor, TaskScheduler resultScheduler) : base(log) { public ThrottledTask(Action resultProcessor, TaskScheduler resultScheduler) {
this.resultProcessor = resultProcessor; this.resultProcessor = resultProcessor;
this.resultScheduler = resultScheduler; this.resultScheduler = resultScheduler;
@ -74,7 +70,7 @@ public sealed class ThrottledTask<T> : ThrottledTaskBase<Task<T>> {
private readonly Action<T> resultProcessor; private readonly Action<T> resultProcessor;
private readonly TaskScheduler resultScheduler; private readonly TaskScheduler resultScheduler;
public ThrottledTask(Log log, Action<T> resultProcessor, TaskScheduler resultScheduler) : base(log) { public ThrottledTask(Action<T> resultProcessor, TaskScheduler resultScheduler) {
this.resultProcessor = resultProcessor; this.resultProcessor = resultProcessor;
this.resultScheduler = resultScheduler; this.resultScheduler = resultScheduler;

Binary file not shown.