mirror of
				https://github.com/chylex/Discord-History-Tracker.git
				synced 2025-10-31 02:17:15 +01:00 
			
		
		
		
	Compare commits
	
		
			8 Commits
		
	
	
		
			c3bf7d5dc3
			...
			v47.2
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 7a8819f95b | |||
| 258bae1865 | |||
| be6203ede9 | |||
| 998893b655 | |||
| 588a1ac0dc | |||
| 2d39825cf5 | |||
| a0325953fd | |||
| 5a33540015 | 
							
								
								
									
										17
									
								
								app/Desktop/Common/AvaloniaObsevableValueExtensions.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								app/Desktop/Common/AvaloniaObsevableValueExtensions.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,17 @@ | |||||||
|  | using System; | ||||||
|  | using System.Threading; | ||||||
|  | using System.Threading.Tasks; | ||||||
|  | using DHT.Utils.Observables; | ||||||
|  |  | ||||||
|  | namespace DHT.Desktop.Common; | ||||||
|  |  | ||||||
|  | static class AvaloniaObsevableValueExtensions { | ||||||
|  | 	public static IDisposable SubscribeLastOnUI<T>(this ObservableValue<T> observable, Action<T> action, TimeSpan delayBetweenRuns) { | ||||||
|  | 		Task Action(T value, CancellationToken cancellationToken) { | ||||||
|  | 			action(value); | ||||||
|  | 			return Task.Delay(delayBetweenRuns, cancellationToken); | ||||||
|  | 		} | ||||||
|  | 		 | ||||||
|  | 		return observable.SubscribeLast(Action, TaskScheduler.FromCurrentSynchronizationContext()); | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -2,9 +2,7 @@ using System; | |||||||
| using System.Collections.Generic; | using System.Collections.Generic; | ||||||
| using System.ComponentModel; | using System.ComponentModel; | ||||||
| using System.Linq; | using System.Linq; | ||||||
| using System.Reactive.Linq; |  | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
| using Avalonia.ReactiveUI; |  | ||||||
| using DHT.Desktop.Common; | using DHT.Desktop.Common; | ||||||
| using DHT.Server; | using DHT.Server; | ||||||
| using DHT.Server.Data.Filters; | using DHT.Server.Data.Filters; | ||||||
| @@ -49,7 +47,7 @@ sealed partial class DownloadItemFilterPanelModel : IAsyncDisposable { | |||||||
| 	private readonly State state; | 	private readonly State state; | ||||||
| 	private readonly string verb; | 	private readonly string verb; | ||||||
| 	 | 	 | ||||||
| 	private readonly DelayedThrottledTask<FilterSettings> saveFilterSettingsTask; | 	private readonly ThrottledTask<FilterSettings> saveFilterSettingsTask; | ||||||
| 	private bool isLoadingFilterSettings; | 	private bool isLoadingFilterSettings; | ||||||
| 	 | 	 | ||||||
| 	private readonly RestartableTask<long> downloadItemCountTask; | 	private readonly RestartableTask<long> downloadItemCountTask; | ||||||
| @@ -65,10 +63,10 @@ sealed partial class DownloadItemFilterPanelModel : IAsyncDisposable { | |||||||
| 		this.state = state; | 		this.state = state; | ||||||
| 		this.verb = verb; | 		this.verb = verb; | ||||||
| 		 | 		 | ||||||
| 		this.saveFilterSettingsTask = new DelayedThrottledTask<FilterSettings>(Log, TimeSpan.FromSeconds(5), SaveFilterSettings); | 		this.saveFilterSettingsTask = new ThrottledTask<FilterSettings>(Log, SaveFilterSettings, TimeSpan.FromSeconds(5), TaskScheduler.Default); | ||||||
| 		 | 		 | ||||||
| 		this.downloadItemCountTask = new RestartableTask<long>(SetMatchingCount, TaskScheduler.FromCurrentSynchronizationContext()); | 		this.downloadItemCountTask = new RestartableTask<long>(SetMatchingCount, TaskScheduler.FromCurrentSynchronizationContext()); | ||||||
| 		this.downloadItemCountSubscription = state.Db.Downloads.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnDownloadItemCountChanged); | 		this.downloadItemCountSubscription = state.Db.Downloads.TotalCount.SubscribeLastOnUI(OnDownloadItemCountChanged, TimeSpan.FromMilliseconds(15)); | ||||||
| 		 | 		 | ||||||
| 		UpdateFilterStatistics(); | 		UpdateFilterStatistics(); | ||||||
| 		 | 		 | ||||||
|   | |||||||
| @@ -4,10 +4,8 @@ using System.Collections.Immutable; | |||||||
| using System.ComponentModel; | using System.ComponentModel; | ||||||
| using System.Diagnostics.CodeAnalysis; | using System.Diagnostics.CodeAnalysis; | ||||||
| using System.Linq; | using System.Linq; | ||||||
| using System.Reactive.Linq; |  | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
| using Avalonia.Controls; | using Avalonia.Controls; | ||||||
| using Avalonia.ReactiveUI; |  | ||||||
| using DHT.Desktop.Common; | using DHT.Desktop.Common; | ||||||
| using DHT.Desktop.Dialogs.CheckBox; | using DHT.Desktop.Dialogs.CheckBox; | ||||||
| using DHT.Desktop.Dialogs.Message; | using DHT.Desktop.Dialogs.Message; | ||||||
| @@ -91,9 +89,9 @@ sealed partial class MessageFilterPanelModel : IDisposable { | |||||||
| 		 | 		 | ||||||
| 		this.exportedMessageCountTask = new RestartableTask<long>(SetExportedMessageCount, TaskScheduler.FromCurrentSynchronizationContext()); | 		this.exportedMessageCountTask = new RestartableTask<long>(SetExportedMessageCount, TaskScheduler.FromCurrentSynchronizationContext()); | ||||||
| 		 | 		 | ||||||
| 		this.messageCountSubscription = state.Db.Messages.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnMessageCountChanged); | 		this.messageCountSubscription = state.Db.Messages.TotalCount.SubscribeLastOnUI(OnMessageCountChanged, TimeSpan.FromMilliseconds(15)); | ||||||
| 		this.channelCountSubscription = state.Db.Channels.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnChannelCountChanged); | 		this.channelCountSubscription = state.Db.Channels.TotalCount.SubscribeLastOnUI(OnChannelCountChanged, TimeSpan.FromMilliseconds(15)); | ||||||
| 		this.userCountSubscription = state.Db.Users.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnUserCountChanged); | 		this.userCountSubscription = state.Db.Users.TotalCount.SubscribeLastOnUI(OnUserCountChanged, TimeSpan.FromMilliseconds(15)); | ||||||
| 		 | 		 | ||||||
| 		UpdateFilterStatistics(); | 		UpdateFilterStatistics(); | ||||||
| 		UpdateChannelFilterLabel(); | 		UpdateChannelFilterLabel(); | ||||||
|   | |||||||
| @@ -1,7 +1,6 @@ | |||||||
| using System; | using System; | ||||||
| using System.Reactive.Linq; |  | ||||||
| using Avalonia.ReactiveUI; |  | ||||||
| using Avalonia.Threading; | using Avalonia.Threading; | ||||||
|  | using DHT.Desktop.Common; | ||||||
| using DHT.Server; | using DHT.Server; | ||||||
| using DHT.Server.Service; | using DHT.Server.Service; | ||||||
| using PropertyChanged.SourceGenerator; | using PropertyChanged.SourceGenerator; | ||||||
| @@ -41,9 +40,9 @@ sealed partial class StatusBarModel : IDisposable { | |||||||
| 	public StatusBarModel(State state) { | 	public StatusBarModel(State state) { | ||||||
| 		this.state = state; | 		this.state = state; | ||||||
| 		 | 		 | ||||||
| 		serverCountSubscription = state.Db.Servers.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(newServerCount => ServerCount = newServerCount); | 		serverCountSubscription = state.Db.Servers.TotalCount.SubscribeLastOnUI(newServerCount => ServerCount = newServerCount, TimeSpan.FromMilliseconds(15)); | ||||||
| 		channelCountSubscription = state.Db.Channels.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(newChannelCount => ChannelCount = newChannelCount); | 		channelCountSubscription = state.Db.Channels.TotalCount.SubscribeLastOnUI(newChannelCount => ChannelCount = newChannelCount, TimeSpan.FromMilliseconds(15)); | ||||||
| 		messageCountSubscription = state.Db.Messages.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(newMessageCount => MessageCount = newMessageCount); | 		messageCountSubscription = state.Db.Messages.TotalCount.SubscribeLastOnUI(newMessageCount => MessageCount = newMessageCount, TimeSpan.FromMilliseconds(15)); | ||||||
| 		 | 		 | ||||||
| 		state.Server.StatusChanged += OnStateServerStatusChanged; | 		state.Server.StatusChanged += OnStateServerStatusChanged; | ||||||
| 		serverStatus = state.Server.IsRunning ? ServerManager.Status.Started : ServerManager.Status.Stopped; | 		serverStatus = state.Server.IsRunning ? ServerManager.Status.Started : ServerManager.Status.Stopped; | ||||||
|   | |||||||
| @@ -55,11 +55,11 @@ sealed class DatabasePageModel { | |||||||
| 				break; | 				break; | ||||||
| 			 | 			 | ||||||
| 			case PlatformID.Unix: | 			case PlatformID.Unix: | ||||||
| 				Process.Start("xdg-open", [ folder ]); | 				Process.Start("xdg-open", [folder]); | ||||||
| 				break; | 				break; | ||||||
| 			 | 			 | ||||||
| 			case PlatformID.MacOSX: | 			case PlatformID.MacOSX: | ||||||
| 				Process.Start("open", [ folder ]); | 				Process.Start("open", [folder]); | ||||||
| 				break; | 				break; | ||||||
| 			 | 			 | ||||||
| 			default: | 			default: | ||||||
| @@ -80,22 +80,25 @@ sealed class DatabasePageModel { | |||||||
| 		 | 		 | ||||||
| 		const string Title = "Database Merge"; | 		const string Title = "Database Merge"; | ||||||
| 		 | 		 | ||||||
| 		ImportResult? result; | 		var result = new TaskCompletionSource<ImportResult?>(); | ||||||
| 		try { | 		try { | ||||||
| 			result = await ProgressDialog.Show(window, Title, async (dialog, callback) => await MergeWithDatabaseFromPaths(Db, paths, dialog, callback)); | 			var dialog = new ProgressDialog(); | ||||||
|  | 			dialog.DataContext = new ProgressDialogModel(Title, async callbacks => result.SetResult(await MergeWithDatabaseFromPaths(Db, paths, dialog, callbacks)), progressItems: 2); | ||||||
|  | 			await dialog.ShowProgressDialog(window); | ||||||
| 		} catch (Exception e) { | 		} catch (Exception e) { | ||||||
| 			Log.Error("Could not merge databases.", e); | 			Log.Error("Could not merge databases.", e); | ||||||
| 			await Dialog.ShowOk(window, Title, "Could not merge databases: " + e.Message); | 			await Dialog.ShowOk(window, Title, "Could not merge databases: " + e.Message); | ||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
| 		 | 		 | ||||||
| 		await Dialog.ShowOk(window, Title, GetImportDialogMessage(result, "database file")); | 		await Dialog.ShowOk(window, Title, GetImportDialogMessage(result.Task.Result, "database file")); | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	private static async Task<ImportResult?> MergeWithDatabaseFromPaths(IDatabaseFile target, string[] paths, ProgressDialog dialog, IProgressCallback callback) { | 	private static async Task<ImportResult?> MergeWithDatabaseFromPaths(IDatabaseFile target, string[] paths, ProgressDialog dialog, IReadOnlyList<IProgressCallback> callbacks) { | ||||||
| 		var schemaUpgradeCallbacks = new SchemaUpgradeCallbacks(dialog, paths.Length); | 		var schemaUpgradeCallbacks = new SchemaUpgradeCallbacks(dialog, callbacks[1], paths.Length); | ||||||
|  | 		var databaseMergeProgressCallback = new DatabaseMergeProgressCallback(callbacks[1]); | ||||||
| 		 | 		 | ||||||
| 		return await PerformImport(target, paths, dialog, callback, "Database Merge", async path => { | 		return await PerformImport(target, paths, dialog, callbacks[0], "Database Merge", async path => { | ||||||
| 			IDatabaseFile? db = await DatabaseGui.TryOpenOrCreateDatabaseFromPath(path, dialog, schemaUpgradeCallbacks); | 			IDatabaseFile? db = await DatabaseGui.TryOpenOrCreateDatabaseFromPath(path, dialog, schemaUpgradeCallbacks); | ||||||
| 			 | 			 | ||||||
| 			if (db == null) { | 			if (db == null) { | ||||||
| @@ -103,7 +106,7 @@ sealed class DatabasePageModel { | |||||||
| 			} | 			} | ||||||
| 			 | 			 | ||||||
| 			try { | 			try { | ||||||
| 				await target.AddFrom(db); | 				await target.Merge(db, databaseMergeProgressCallback); | ||||||
| 				return true; | 				return true; | ||||||
| 			} finally { | 			} finally { | ||||||
| 				await db.DisposeAsync(); | 				await db.DisposeAsync(); | ||||||
| @@ -111,7 +114,7 @@ sealed class DatabasePageModel { | |||||||
| 		}); | 		}); | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	private sealed class SchemaUpgradeCallbacks(ProgressDialog dialog, int total) : ISchemaUpgradeCallbacks { | 	private sealed class SchemaUpgradeCallbacks(ProgressDialog dialog, IProgressCallback callback, int total) : ISchemaUpgradeCallbacks { | ||||||
| 		private bool? decision; | 		private bool? decision; | ||||||
| 		 | 		 | ||||||
| 		public Task<InitialDatabaseSettings?> GetInitialDatabaseSettings() { | 		public Task<InitialDatabaseSettings?> GetInitialDatabaseSettings() { | ||||||
| @@ -125,6 +128,7 @@ sealed class DatabasePageModel { | |||||||
| 		} | 		} | ||||||
| 		 | 		 | ||||||
| 		public Task Start(int versionSteps, Func<ISchemaUpgradeCallbacks.IProgressReporter, Task> doUpgrade) { | 		public Task Start(int versionSteps, Func<ISchemaUpgradeCallbacks.IProgressReporter, Task> doUpgrade) { | ||||||
|  | 			callback.UpdateIndeterminate("Upgrading database..."); | ||||||
| 			return doUpgrade(new NullReporter()); | 			return doUpgrade(new NullReporter()); | ||||||
| 		} | 		} | ||||||
| 		 | 		 | ||||||
| @@ -143,6 +147,20 @@ sealed class DatabasePageModel { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
|  | 	private sealed class DatabaseMergeProgressCallback(IProgressCallback callback) : DatabaseMerging.IProgressCallback { | ||||||
|  | 		public void OnImportingMetadata() { | ||||||
|  | 			callback.UpdateIndeterminate("Importing metadata..."); | ||||||
|  | 		} | ||||||
|  | 		 | ||||||
|  | 		public void OnMessagesImported(long finished, long total) { | ||||||
|  | 			callback.Update("Importing messages...", finished, total); | ||||||
|  | 		} | ||||||
|  | 		 | ||||||
|  | 		public void OnDownloadsImported(long finished, long total) { | ||||||
|  | 			callback.Update("Importing downloaded files...", finished, total); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
| 	public async Task ImportLegacyArchive() { | 	public async Task ImportLegacyArchive() { | ||||||
| 		string[] paths = await window.StorageProvider.OpenFiles(new FilePickerOpenOptions { | 		string[] paths = await window.StorageProvider.OpenFiles(new FilePickerOpenOptions { | ||||||
| 			Title = "Open Legacy DHT Archive", | 			Title = "Open Legacy DHT Archive", | ||||||
| @@ -223,7 +241,7 @@ sealed class DatabasePageModel { | |||||||
| 		int finished = 0; | 		int finished = 0; | ||||||
| 		 | 		 | ||||||
| 		foreach (string path in paths) { | 		foreach (string path in paths) { | ||||||
| 			await callback.Update(Path.GetFileName(path), finished, total); | 			await callback.Update("File: " + Path.GetFileName(path), finished, total); | ||||||
| 			++finished; | 			++finished; | ||||||
| 			 | 			 | ||||||
| 			if (!File.Exists(path)) { | 			if (!File.Exists(path)) { | ||||||
|   | |||||||
| @@ -2,11 +2,9 @@ using System; | |||||||
| using System.Collections.Generic; | using System.Collections.Generic; | ||||||
| using System.Collections.ObjectModel; | using System.Collections.ObjectModel; | ||||||
| using System.ComponentModel; | using System.ComponentModel; | ||||||
| using System.Reactive.Linq; |  | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
| using Avalonia.Controls; | using Avalonia.Controls; | ||||||
| using Avalonia.Platform.Storage; | using Avalonia.Platform.Storage; | ||||||
| using Avalonia.ReactiveUI; |  | ||||||
| using DHT.Desktop.Common; | using DHT.Desktop.Common; | ||||||
| using DHT.Desktop.Dialogs.File; | using DHT.Desktop.Dialogs.File; | ||||||
| using DHT.Desktop.Dialogs.Message; | using DHT.Desktop.Dialogs.Message; | ||||||
| @@ -19,6 +17,7 @@ using DHT.Server.Data.Filters; | |||||||
| using DHT.Server.Data.Settings; | using DHT.Server.Data.Settings; | ||||||
| using DHT.Server.Download; | using DHT.Server.Download; | ||||||
| using DHT.Utils.Logging; | using DHT.Utils.Logging; | ||||||
|  | using DHT.Utils.Observables; | ||||||
| using DHT.Utils.Tasks; | using DHT.Utils.Tasks; | ||||||
| using PropertyChanged.SourceGenerator; | using PropertyChanged.SourceGenerator; | ||||||
|  |  | ||||||
| @@ -82,8 +81,8 @@ sealed partial class DownloadsPageModel : IAsyncDisposable { | |||||||
| 			statisticsSkipped, | 			statisticsSkipped, | ||||||
| 		]; | 		]; | ||||||
| 		 | 		 | ||||||
| 		downloadStatisticsTask = new ThrottledTask<DownloadStatusStatistics>(Log, UpdateStatistics, TaskScheduler.FromCurrentSynchronizationContext()); | 		downloadStatisticsTask = new ThrottledTask<DownloadStatusStatistics>(Log, UpdateStatistics, TimeSpan.FromMilliseconds(100), TaskScheduler.FromCurrentSynchronizationContext()); | ||||||
| 		downloadItemCountSubscription = state.Db.Downloads.TotalCount.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnDownloadCountChanged); | 		downloadItemCountSubscription = state.Db.Downloads.TotalCount.SubscribeLastOnUI(OnDownloadCountChanged, TimeSpan.FromMilliseconds(15)); | ||||||
| 		 | 		 | ||||||
| 		RecomputeDownloadStatistics(); | 		RecomputeDownloadStatistics(); | ||||||
| 	} | 	} | ||||||
| @@ -145,8 +144,8 @@ sealed partial class DownloadsPageModel : IAsyncDisposable { | |||||||
| 		 | 		 | ||||||
| 		try { | 		try { | ||||||
| 			currentDownloadFilter = FilterModel.CreateFilter(); | 			currentDownloadFilter = FilterModel.CreateFilter(); | ||||||
| 			IObservable<DownloadItem> finishedItems = await state.Downloader.Start(currentDownloadFilter); | 			ObservableValue<DownloadItem> finishedItems = await state.Downloader.Start(currentDownloadFilter); | ||||||
| 			finishedItemsSubscription = finishedItems.ObserveOn(AvaloniaScheduler.Instance).Subscribe(OnItemFinished); | 			finishedItemsSubscription = finishedItems.SubscribeLastOnUI(OnItemFinished, TimeSpan.FromMilliseconds(15)); | ||||||
| 		} catch (Exception) { | 		} catch (Exception) { | ||||||
| 			finishedItemsSubscription?.Dispose(); | 			finishedItemsSubscription?.Dispose(); | ||||||
| 			finishedItemsSubscription = null; | 			finishedItemsSubscription = null; | ||||||
| @@ -285,7 +284,7 @@ sealed partial class DownloadsPageModel : IAsyncDisposable { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	private void UpdateStatistics(DownloadStatusStatistics statusStatistics) { | 	private Task UpdateStatistics(DownloadStatusStatistics statusStatistics) { | ||||||
| 		statisticsPending.Items = statusStatistics.PendingCount; | 		statisticsPending.Items = statusStatistics.PendingCount; | ||||||
| 		statisticsPending.Size = statusStatistics.PendingTotalSize; | 		statisticsPending.Size = statusStatistics.PendingTotalSize; | ||||||
| 		statisticsPending.HasFilesWithUnknownSize = statusStatistics.PendingWithUnknownSizeCount > 0; | 		statisticsPending.HasFilesWithUnknownSize = statusStatistics.PendingWithUnknownSizeCount > 0; | ||||||
| @@ -304,6 +303,8 @@ sealed partial class DownloadsPageModel : IAsyncDisposable { | |||||||
| 		 | 		 | ||||||
| 		HasSuccessfulDownloads = statusStatistics.SuccessfulCount > 0; | 		HasSuccessfulDownloads = statusStatistics.SuccessfulCount > 0; | ||||||
| 		HasFailedDownloads = statusStatistics.FailedCount > 0; | 		HasFailedDownloads = statusStatistics.FailedCount > 0; | ||||||
|  | 		 | ||||||
|  | 		return Task.CompletedTask; | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	public sealed partial class StatisticsRow(string state) { | 	public sealed partial class StatisticsRow(string state) { | ||||||
|   | |||||||
| @@ -107,7 +107,7 @@ export default (function() { | |||||||
| 	const isImageUrl = function(url) { | 	const isImageUrl = function(url) { | ||||||
| 		const dot = url.pathname.lastIndexOf("."); | 		const dot = url.pathname.lastIndexOf("."); | ||||||
| 		const ext = dot === -1 ? "" : url.pathname.substring(dot).toLowerCase(); | 		const ext = dot === -1 ? "" : url.pathname.substring(dot).toLowerCase(); | ||||||
| 		return ext === ".png" || ext === ".gif" || ext === ".jpg" || ext === ".jpeg"; | 		return ext === ".png" || ext === ".gif" || ext === ".jpg" || ext === ".jpeg" || ext === ".webp" || ext === ".avif"; | ||||||
| 	}; | 	}; | ||||||
| 	 | 	 | ||||||
| 	return { | 	return { | ||||||
|   | |||||||
| @@ -1,34 +0,0 @@ | |||||||
| using System.Collections.Generic; |  | ||||||
| using System.Linq; |  | ||||||
| using System.Threading.Tasks; |  | ||||||
| using DHT.Server.Data; |  | ||||||
|  |  | ||||||
| namespace DHT.Server.Database; |  | ||||||
|  |  | ||||||
| public static class DatabaseExtensions { |  | ||||||
| 	public static async Task AddFrom(this IDatabaseFile target, IDatabaseFile source) { |  | ||||||
| 		await target.Users.Add(await source.Users.Get().ToListAsync()); |  | ||||||
| 		await target.Servers.Add(await source.Servers.Get().ToListAsync()); |  | ||||||
| 		await target.Channels.Add(await source.Channels.Get().ToListAsync()); |  | ||||||
| 		 |  | ||||||
| 		const int MessageBatchSize = 100; |  | ||||||
| 		List<Message> batchedMessages = new (MessageBatchSize); |  | ||||||
| 		 |  | ||||||
| 		await foreach (Message message in source.Messages.Get()) { |  | ||||||
| 			batchedMessages.Add(message); |  | ||||||
| 			 |  | ||||||
| 			if (batchedMessages.Count >= MessageBatchSize) { |  | ||||||
| 				await target.Messages.Add(batchedMessages); |  | ||||||
| 				batchedMessages.Clear(); |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 		 |  | ||||||
| 		await target.Messages.Add(batchedMessages); |  | ||||||
| 		 |  | ||||||
| 		await foreach (Data.Download download in source.Downloads.Get()) { |  | ||||||
| 			if (download.Status != DownloadStatus.Success || !await source.Downloads.GetDownloadData(download.NormalizedUrl, stream => target.Downloads.AddDownload(download, stream))) { |  | ||||||
| 				await target.Downloads.AddDownload(download, stream: null); |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
							
								
								
									
										77
									
								
								app/Server/Database/Import/DatabaseMerging.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										77
									
								
								app/Server/Database/Import/DatabaseMerging.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,77 @@ | |||||||
|  | using System.Collections.Generic; | ||||||
|  | using System.Linq; | ||||||
|  | using System.Threading.Tasks; | ||||||
|  | using DHT.Server.Data; | ||||||
|  |  | ||||||
|  | namespace DHT.Server.Database.Import; | ||||||
|  |  | ||||||
|  | public static class DatabaseMerging { | ||||||
|  | 	public static async Task Merge(this IDatabaseFile target, IDatabaseFile source, IProgressCallback callback) { | ||||||
|  | 		// Import downloads first, otherwise automatic downloads would try to re-download files from other imported data. | ||||||
|  | 		await MergeDownloads(target, source, callback); | ||||||
|  | 		 | ||||||
|  | 		callback.OnImportingMetadata(); | ||||||
|  | 		await target.Users.Add(await source.Users.Get().ToListAsync()); | ||||||
|  | 		await target.Servers.Add(await source.Servers.Get().ToListAsync()); | ||||||
|  | 		await target.Channels.Add(await source.Channels.Get().ToListAsync()); | ||||||
|  | 		 | ||||||
|  | 		await MergeMessages(target, source, callback); | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
|  | 	private static async Task MergeDownloads(IDatabaseFile target, IDatabaseFile source, IProgressCallback callback) { | ||||||
|  | 		const int ReportBatchSize = 100; | ||||||
|  | 		 | ||||||
|  | 		long totalDownloads = await source.Downloads.Count(); | ||||||
|  | 		long importedDownloads = 0; | ||||||
|  | 		 | ||||||
|  | 		callback.OnDownloadsImported(importedDownloads, totalDownloads); | ||||||
|  | 		 | ||||||
|  | 		await foreach (Data.Download download in source.Downloads.Get()) { | ||||||
|  | 			if (download.Status != DownloadStatus.Success || !await source.Downloads.GetDownloadData(download.NormalizedUrl, stream => target.Downloads.AddDownload(download, stream))) { | ||||||
|  | 				await target.Downloads.AddDownload(download, stream: null); | ||||||
|  | 			} | ||||||
|  | 			 | ||||||
|  | 			if (++importedDownloads % ReportBatchSize == 0) { | ||||||
|  | 				callback.OnDownloadsImported(importedDownloads, totalDownloads); | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		 | ||||||
|  | 		callback.OnDownloadsImported(totalDownloads, totalDownloads); | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
|  | 	private static async Task MergeMessages(IDatabaseFile target, IDatabaseFile source, IProgressCallback callback) { | ||||||
|  | 		const int MessageBatchSize = 100; | ||||||
|  | 		const int ReportEveryBatches = 10; | ||||||
|  | 		List<Message> batchedMessages = new (MessageBatchSize); | ||||||
|  | 		 | ||||||
|  | 		long totalMessages = await source.Messages.Count(); | ||||||
|  | 		long importedMessages = 0; | ||||||
|  | 		 | ||||||
|  | 		callback.OnMessagesImported(importedMessages, totalMessages); | ||||||
|  | 		 | ||||||
|  | 		await foreach (Message message in source.Messages.Get()) { | ||||||
|  | 			batchedMessages.Add(message); | ||||||
|  | 			 | ||||||
|  | 			if (batchedMessages.Count >= MessageBatchSize) { | ||||||
|  | 				await target.Messages.Add(batchedMessages); | ||||||
|  | 				 | ||||||
|  | 				importedMessages += batchedMessages.Count; | ||||||
|  | 				 | ||||||
|  | 				if (importedMessages % (MessageBatchSize * ReportEveryBatches) == 0) { | ||||||
|  | 					callback.OnMessagesImported(importedMessages, totalMessages); | ||||||
|  | 				} | ||||||
|  | 				 | ||||||
|  | 				batchedMessages.Clear(); | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		 | ||||||
|  | 		await target.Messages.Add(batchedMessages); | ||||||
|  | 		callback.OnMessagesImported(totalMessages, totalMessages); | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
|  | 	public interface IProgressCallback { | ||||||
|  | 		void OnImportingMetadata(); | ||||||
|  | 		void OnMessagesImported(long finished, long total); | ||||||
|  | 		void OnDownloadsImported(long finished, long total); | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -1,15 +1,14 @@ | |||||||
| using System; |  | ||||||
| using System.Collections.Generic; | using System.Collections.Generic; | ||||||
| using System.Linq; | using System.Linq; | ||||||
| using System.Reactive.Linq; |  | ||||||
| using System.Threading; | using System.Threading; | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
| using DHT.Server.Data; | using DHT.Server.Data; | ||||||
|  | using DHT.Utils.Observables; | ||||||
|  |  | ||||||
| namespace DHT.Server.Database.Repositories; | namespace DHT.Server.Database.Repositories; | ||||||
|  |  | ||||||
| public interface IChannelRepository { | public interface IChannelRepository { | ||||||
| 	IObservable<long> TotalCount { get; } | 	ObservableValue<long> TotalCount { get; } | ||||||
| 	 | 	 | ||||||
| 	Task Add(IReadOnlyList<Channel> channels); | 	Task Add(IReadOnlyList<Channel> channels); | ||||||
| 	 | 	 | ||||||
| @@ -20,7 +19,7 @@ public interface IChannelRepository { | |||||||
| 	Task<int> RemoveUnreachable(); | 	Task<int> RemoveUnreachable(); | ||||||
| 	 | 	 | ||||||
| 	internal sealed class Dummy : IChannelRepository { | 	internal sealed class Dummy : IChannelRepository { | ||||||
| 		public IObservable<long> TotalCount { get; } = Observable.Return(0L); | 		public ObservableValue<long> TotalCount { get; } = new (0L); | ||||||
| 		 | 		 | ||||||
| 		public Task Add(IReadOnlyList<Channel> channels) { | 		public Task Add(IReadOnlyList<Channel> channels) { | ||||||
| 			return Task.CompletedTask; | 			return Task.CompletedTask; | ||||||
|   | |||||||
| @@ -2,22 +2,22 @@ using System; | |||||||
| using System.Collections.Generic; | using System.Collections.Generic; | ||||||
| using System.IO; | using System.IO; | ||||||
| using System.Linq; | using System.Linq; | ||||||
| using System.Reactive.Linq; |  | ||||||
| using System.Threading; | using System.Threading; | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
| using DHT.Server.Data; | using DHT.Server.Data; | ||||||
| using DHT.Server.Data.Aggregations; | using DHT.Server.Data.Aggregations; | ||||||
| using DHT.Server.Data.Filters; | using DHT.Server.Data.Filters; | ||||||
| using DHT.Server.Download; | using DHT.Server.Download; | ||||||
|  | using DHT.Utils.Observables; | ||||||
|  |  | ||||||
| namespace DHT.Server.Database.Repositories; | namespace DHT.Server.Database.Repositories; | ||||||
|  |  | ||||||
| public interface IDownloadRepository { | public interface IDownloadRepository { | ||||||
| 	IObservable<long> TotalCount { get; } | 	ObservableValue<long> TotalCount { get; } | ||||||
| 	 | 	 | ||||||
| 	Task AddDownload(Data.Download item, Stream? stream); | 	Task AddDownload(Data.Download item, Stream? stream); | ||||||
| 	 | 	 | ||||||
| 	Task<long> Count(DownloadItemFilter filter, CancellationToken cancellationToken = default); | 	Task<long> Count(DownloadItemFilter? filter = null, CancellationToken cancellationToken = default); | ||||||
| 	 | 	 | ||||||
| 	Task<DownloadStatusStatistics> GetStatistics(DownloadItemFilter nonSkippedFilter, CancellationToken cancellationToken = default); | 	Task<DownloadStatusStatistics> GetStatistics(DownloadItemFilter nonSkippedFilter, CancellationToken cancellationToken = default); | ||||||
| 	 | 	 | ||||||
| @@ -38,13 +38,13 @@ public interface IDownloadRepository { | |||||||
| 	IAsyncEnumerable<FileUrl> FindReachableFiles(CancellationToken cancellationToken = default); | 	IAsyncEnumerable<FileUrl> FindReachableFiles(CancellationToken cancellationToken = default); | ||||||
| 	 | 	 | ||||||
| 	internal sealed class Dummy : IDownloadRepository { | 	internal sealed class Dummy : IDownloadRepository { | ||||||
| 		public IObservable<long> TotalCount { get; } = Observable.Return(0L); | 		public ObservableValue<long> TotalCount { get; } = new (0L); | ||||||
| 		 | 		 | ||||||
| 		public Task AddDownload(Data.Download item, Stream? stream) { | 		public Task AddDownload(Data.Download item, Stream? stream) { | ||||||
| 			return Task.CompletedTask; | 			return Task.CompletedTask; | ||||||
| 		} | 		} | ||||||
| 		 | 		 | ||||||
| 		public Task<long> Count(DownloadItemFilter filter, CancellationToken cancellationToken) { | 		public Task<long> Count(DownloadItemFilter? filter, CancellationToken cancellationToken) { | ||||||
| 			return Task.FromResult(0L); | 			return Task.FromResult(0L); | ||||||
| 		} | 		} | ||||||
| 		 | 		 | ||||||
|   | |||||||
| @@ -1,16 +1,15 @@ | |||||||
| using System; |  | ||||||
| using System.Collections.Generic; | using System.Collections.Generic; | ||||||
| using System.Linq; | using System.Linq; | ||||||
| using System.Reactive.Linq; |  | ||||||
| using System.Threading; | using System.Threading; | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
| using DHT.Server.Data; | using DHT.Server.Data; | ||||||
| using DHT.Server.Data.Filters; | using DHT.Server.Data.Filters; | ||||||
|  | using DHT.Utils.Observables; | ||||||
|  |  | ||||||
| namespace DHT.Server.Database.Repositories; | namespace DHT.Server.Database.Repositories; | ||||||
|  |  | ||||||
| public interface IMessageRepository { | public interface IMessageRepository { | ||||||
| 	IObservable<long> TotalCount { get; } | 	ObservableValue<long> TotalCount { get; } | ||||||
| 	 | 	 | ||||||
| 	Task Add(IReadOnlyList<Message> messages); | 	Task Add(IReadOnlyList<Message> messages); | ||||||
| 	 | 	 | ||||||
| @@ -25,7 +24,7 @@ public interface IMessageRepository { | |||||||
| 	Task<int> RemoveUnreachableAttachments(); | 	Task<int> RemoveUnreachableAttachments(); | ||||||
| 	 | 	 | ||||||
| 	internal sealed class Dummy : IMessageRepository { | 	internal sealed class Dummy : IMessageRepository { | ||||||
| 		public IObservable<long> TotalCount { get; } = Observable.Return(0L); | 		public ObservableValue<long> TotalCount { get; } = new (0L); | ||||||
| 		 | 		 | ||||||
| 		public Task Add(IReadOnlyList<Message> messages) { | 		public Task Add(IReadOnlyList<Message> messages) { | ||||||
| 			return Task.CompletedTask; | 			return Task.CompletedTask; | ||||||
|   | |||||||
| @@ -1,14 +1,13 @@ | |||||||
| using System; |  | ||||||
| using System.Collections.Generic; | using System.Collections.Generic; | ||||||
| using System.Linq; | using System.Linq; | ||||||
| using System.Reactive.Linq; |  | ||||||
| using System.Threading; | using System.Threading; | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
|  | using DHT.Utils.Observables; | ||||||
|  |  | ||||||
| namespace DHT.Server.Database.Repositories; | namespace DHT.Server.Database.Repositories; | ||||||
|  |  | ||||||
| public interface IServerRepository { | public interface IServerRepository { | ||||||
| 	IObservable<long> TotalCount { get; } | 	ObservableValue<long> TotalCount { get; } | ||||||
| 	 | 	 | ||||||
| 	Task Add(IReadOnlyList<Data.Server> servers); | 	Task Add(IReadOnlyList<Data.Server> servers); | ||||||
| 	 | 	 | ||||||
| @@ -19,7 +18,7 @@ public interface IServerRepository { | |||||||
| 	Task<int> RemoveUnreachable(); | 	Task<int> RemoveUnreachable(); | ||||||
| 	 | 	 | ||||||
| 	internal sealed class Dummy : IServerRepository { | 	internal sealed class Dummy : IServerRepository { | ||||||
| 		public IObservable<long> TotalCount { get; } = Observable.Return(0L); | 		public ObservableValue<long> TotalCount { get; } = new (0L); | ||||||
| 		 | 		 | ||||||
| 		public Task Add(IReadOnlyList<Data.Server> servers) { | 		public Task Add(IReadOnlyList<Data.Server> servers) { | ||||||
| 			return Task.CompletedTask; | 			return Task.CompletedTask; | ||||||
|   | |||||||
| @@ -1,15 +1,14 @@ | |||||||
| using System; |  | ||||||
| using System.Collections.Generic; | using System.Collections.Generic; | ||||||
| using System.Linq; | using System.Linq; | ||||||
| using System.Reactive.Linq; |  | ||||||
| using System.Threading; | using System.Threading; | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
| using DHT.Server.Data; | using DHT.Server.Data; | ||||||
|  | using DHT.Utils.Observables; | ||||||
|  |  | ||||||
| namespace DHT.Server.Database.Repositories; | namespace DHT.Server.Database.Repositories; | ||||||
|  |  | ||||||
| public interface IUserRepository { | public interface IUserRepository { | ||||||
| 	IObservable<long> TotalCount { get; } | 	ObservableValue<long> TotalCount { get; } | ||||||
| 	 | 	 | ||||||
| 	Task Add(IReadOnlyList<User> users); | 	Task Add(IReadOnlyList<User> users); | ||||||
| 	 | 	 | ||||||
| @@ -20,7 +19,7 @@ public interface IUserRepository { | |||||||
| 	Task<int> RemoveUnreachable(); | 	Task<int> RemoveUnreachable(); | ||||||
| 	 | 	 | ||||||
| 	internal sealed class Dummy : IUserRepository { | 	internal sealed class Dummy : IUserRepository { | ||||||
| 		public IObservable<long> TotalCount { get; } = Observable.Return(0L); | 		public ObservableValue<long> TotalCount { get; } = new (0L); | ||||||
| 		 | 		 | ||||||
| 		public Task Add(IReadOnlyList<User> users) { | 		public Task Add(IReadOnlyList<User> users) { | ||||||
| 			return Task.CompletedTask; | 			return Task.CompletedTask; | ||||||
|   | |||||||
| @@ -1,23 +1,27 @@ | |||||||
| using System; | using System; | ||||||
| using System.Reactive.Linq; |  | ||||||
| using System.Threading; | using System.Threading; | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
| using DHT.Utils.Logging; | using DHT.Utils.Logging; | ||||||
|  | using DHT.Utils.Observables; | ||||||
| using DHT.Utils.Tasks; | using DHT.Utils.Tasks; | ||||||
|  |  | ||||||
| namespace DHT.Server.Database.Sqlite.Repositories; | namespace DHT.Server.Database.Sqlite.Repositories; | ||||||
|  |  | ||||||
| abstract class BaseSqliteRepository : IDisposable { | abstract class BaseSqliteRepository : IDisposable { | ||||||
| 	private readonly ObservableThrottledTask<long> totalCountTask; | 	private readonly ThrottledTask<long> totalCountTask; | ||||||
| 	 | 	 | ||||||
| 	public IObservable<long> TotalCount { get; } | 	public ObservableValue<long> TotalCount { get; } = new (0L); | ||||||
| 	 | 	 | ||||||
| 	protected BaseSqliteRepository(Log log) { | 	protected BaseSqliteRepository(Log log) { | ||||||
| 		totalCountTask = new ObservableThrottledTask<long>(log, TaskScheduler.Default); | 		totalCountTask = new ThrottledTask<long>(log, SetTotalCount, TimeSpan.Zero, TaskScheduler.Default); | ||||||
| 		TotalCount = totalCountTask.DistinctUntilChanged(); |  | ||||||
| 		UpdateTotalCount(); | 		UpdateTotalCount(); | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
|  | 	private Task SetTotalCount(long newCount) { | ||||||
|  | 		TotalCount.Set(newCount); | ||||||
|  | 		return Task.CompletedTask; | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
| 	public void Dispose() { | 	public void Dispose() { | ||||||
| 		totalCountTask.Dispose(); | 		totalCountTask.Dispose(); | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -306,12 +306,14 @@ sealed class SqliteDownloadRepository(SqliteConnectionPool pool) : BaseSqliteRep | |||||||
| 			await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); | 			await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); | ||||||
| 			 | 			 | ||||||
| 			while (await reader.ReadAsync(cancellationToken)) { | 			while (await reader.ReadAsync(cancellationToken)) { | ||||||
| 				found.Add(new DownloadItem { | 				var item = new DownloadItem( | ||||||
| 					NormalizedUrl = reader.GetString(0), | 					NormalizedUrl: reader.GetString(0), | ||||||
| 					DownloadUrl = reader.GetString(1), | 					DownloadUrl: reader.GetString(1), | ||||||
| 					Type = reader.IsDBNull(2) ? null : reader.GetString(2), | 					Type: reader.IsDBNull(2) ? null : reader.GetString(2), | ||||||
| 					Size = reader.IsDBNull(3) ? null : reader.GetUint64(3), | 					Size: reader.IsDBNull(3) ? null : reader.GetUint64(3) | ||||||
| 				}); | 				); | ||||||
|  | 				 | ||||||
|  | 				found.Add(item); | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		 | 		 | ||||||
|   | |||||||
| @@ -4,12 +4,7 @@ using DHT.Server.Data; | |||||||
|  |  | ||||||
| namespace DHT.Server.Download; | namespace DHT.Server.Download; | ||||||
|  |  | ||||||
| public readonly struct DownloadItem { | public sealed record DownloadItem(string NormalizedUrl, string DownloadUrl, string? Type, ulong? Size) { | ||||||
| 	public string NormalizedUrl { get; init; } |  | ||||||
| 	public string DownloadUrl { get; init; } |  | ||||||
| 	public string? Type { get; init; } |  | ||||||
| 	public ulong? Size { get; init; } |  | ||||||
| 	 |  | ||||||
| 	internal Data.Download ToSuccess(long size) { | 	internal Data.Download ToSuccess(long size) { | ||||||
| 		return new Data.Download(NormalizedUrl, DownloadUrl, DownloadStatus.Success, Type, (ulong) Math.Max(size, val2: 0)); | 		return new Data.Download(NormalizedUrl, DownloadUrl, DownloadStatus.Success, Type, (ulong) Math.Max(size, val2: 0)); | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -1,8 +1,8 @@ | |||||||
| using System; |  | ||||||
| using System.Threading; | using System.Threading; | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
| using DHT.Server.Data.Filters; | using DHT.Server.Data.Filters; | ||||||
| using DHT.Server.Database; | using DHT.Server.Database; | ||||||
|  | using DHT.Utils.Observables; | ||||||
|  |  | ||||||
| namespace DHT.Server.Download; | namespace DHT.Server.Download; | ||||||
|  |  | ||||||
| @@ -19,11 +19,11 @@ public sealed class Downloader { | |||||||
| 		this.concurrentDownloads = concurrentDownloads; | 		this.concurrentDownloads = concurrentDownloads; | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	public async Task<IObservable<DownloadItem>> Start(DownloadItemFilter filter) { | 	public async Task<ObservableValue<DownloadItem>> Start(DownloadItemFilter filter) { | ||||||
| 		await semaphore.WaitAsync(); | 		await semaphore.WaitAsync(); | ||||||
| 		try { | 		try { | ||||||
| 			current ??= new DownloaderTask(db, filter, concurrentDownloads); | 			current ??= new DownloaderTask(db, filter, concurrentDownloads); | ||||||
| 			return current.FinishedItems; | 			return current.LastFinishedItem; | ||||||
| 		} finally { | 		} finally { | ||||||
| 			semaphore.Release(); | 			semaphore.Release(); | ||||||
| 		} | 		} | ||||||
|   | |||||||
| @@ -4,13 +4,13 @@ using System.IO; | |||||||
| using System.Linq; | using System.Linq; | ||||||
| using System.Net; | using System.Net; | ||||||
| using System.Net.Http; | using System.Net.Http; | ||||||
| using System.Reactive.Subjects; |  | ||||||
| using System.Threading; | using System.Threading; | ||||||
| using System.Threading.Channels; | using System.Threading.Channels; | ||||||
| using System.Threading.Tasks; | using System.Threading.Tasks; | ||||||
| using DHT.Server.Data.Filters; | using DHT.Server.Data.Filters; | ||||||
| using DHT.Server.Database; | using DHT.Server.Database; | ||||||
| using DHT.Utils.Logging; | using DHT.Utils.Logging; | ||||||
|  | using DHT.Utils.Observables; | ||||||
| using DHT.Utils.Tasks; | using DHT.Utils.Tasks; | ||||||
|  |  | ||||||
| namespace DHT.Server.Download; | namespace DHT.Server.Download; | ||||||
| @@ -38,12 +38,11 @@ sealed class DownloaderTask : IAsyncDisposable { | |||||||
| 	 | 	 | ||||||
| 	private readonly IDatabaseFile db; | 	private readonly IDatabaseFile db; | ||||||
| 	private readonly DownloadItemFilter filter; | 	private readonly DownloadItemFilter filter; | ||||||
| 	private readonly ISubject<DownloadItem> finishedItemPublisher = Subject.Synchronize(new Subject<DownloadItem>()); |  | ||||||
| 	 | 	 | ||||||
| 	private readonly Task queueWriterTask; | 	private readonly Task queueWriterTask; | ||||||
| 	private readonly Task[] downloadTasks; | 	private readonly Task[] downloadTasks; | ||||||
| 	 | 	 | ||||||
| 	public IObservable<DownloadItem> FinishedItems => finishedItemPublisher; | 	public ObservableValue<DownloadItem> LastFinishedItem { get; } = new (null); | ||||||
| 	 | 	 | ||||||
| 	internal DownloaderTask(IDatabaseFile db, DownloadItemFilter filter, int? concurrentDownloads) { | 	internal DownloaderTask(IDatabaseFile db, DownloadItemFilter filter, int? concurrentDownloads) { | ||||||
| 		this.db = db; | 		this.db = db; | ||||||
| @@ -101,7 +100,7 @@ sealed class DownloaderTask : IAsyncDisposable { | |||||||
| 				log.Error("Could not download file: " + item.DownloadUrl, e); | 				log.Error("Could not download file: " + item.DownloadUrl, e); | ||||||
| 			} finally { | 			} finally { | ||||||
| 				try { | 				try { | ||||||
| 					finishedItemPublisher.OnNext(item); | 					LastFinishedItem.Set(item); | ||||||
| 				} catch (Exception e) { | 				} catch (Exception e) { | ||||||
| 					log.Error("Caught exception in event handler: " + e); | 					log.Error("Caught exception in event handler: " + e); | ||||||
| 				} | 				} | ||||||
| @@ -145,7 +144,6 @@ sealed class DownloaderTask : IAsyncDisposable { | |||||||
| 			await Task.WhenAll(downloadTasks).WaitIgnoringCancellation(); | 			await Task.WhenAll(downloadTasks).WaitIgnoringCancellation(); | ||||||
| 		} finally { | 		} finally { | ||||||
| 			cancellationTokenSource.Dispose(); | 			cancellationTokenSource.Dispose(); | ||||||
| 			finishedItemPublisher.OnCompleted(); |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										63
									
								
								app/Utils/Observables/LastValueObserver.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								app/Utils/Observables/LastValueObserver.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,63 @@ | |||||||
|  | using System; | ||||||
|  | using System.Threading; | ||||||
|  | using System.Threading.Channels; | ||||||
|  | using System.Threading.Tasks; | ||||||
|  |  | ||||||
|  | namespace DHT.Utils.Observables; | ||||||
|  |  | ||||||
|  | sealed class LastValueObserver<T> : IDisposable { | ||||||
|  | 	private readonly ObservableValue<T> observable; | ||||||
|  | 	private readonly Func<T, CancellationToken, Task> action; | ||||||
|  | 	private readonly TaskScheduler scheduler; | ||||||
|  | 	 | ||||||
|  | 	private readonly Channel<T> channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity: 1) { | ||||||
|  | 		AllowSynchronousContinuations = false, | ||||||
|  | 		FullMode = BoundedChannelFullMode.DropOldest, | ||||||
|  | 		SingleReader = true, | ||||||
|  | 		SingleWriter = false, | ||||||
|  | 	}); | ||||||
|  | 	 | ||||||
|  | 	private readonly CancellationTokenSource cancellationTokenSource = new (); | ||||||
|  | 	 | ||||||
|  | 	public LastValueObserver(ObservableValue<T> observable, Func<T, CancellationToken, Task> action, TaskScheduler scheduler) { | ||||||
|  | 		this.observable = observable; | ||||||
|  | 		this.action = action; | ||||||
|  | 		this.scheduler = scheduler; | ||||||
|  | 		 | ||||||
|  | 		_ = ReadNextValue(); | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
|  | 	private async Task ReadNextValue() { | ||||||
|  | 		CancellationToken cancellationToken = cancellationTokenSource.Token; | ||||||
|  | 		 | ||||||
|  | 		try { | ||||||
|  | 			await foreach (T value in channel.Reader.ReadAllAsync(cancellationToken)) { | ||||||
|  | 				try { | ||||||
|  | 					await Task.Factory.StartNew(UseValue, value, CancellationToken.None, TaskCreationOptions.None, scheduler).WaitAsync(cancellationToken); | ||||||
|  | 				} catch (Exception) { | ||||||
|  | 					// Ignore. | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} finally { | ||||||
|  | 			cancellationTokenSource.Dispose(); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
|  | 	private Task UseValue(object? value) { | ||||||
|  | 		return action((T) value!, cancellationTokenSource.Token); | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
|  | 	public void Notify(T value) { | ||||||
|  | 		channel.Writer.TryWrite(value); | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
|  | 	public void Dispose() { | ||||||
|  | 		observable.Unsubscribe(this); | ||||||
|  | 		 | ||||||
|  | 		try { | ||||||
|  | 			cancellationTokenSource.Cancel(); | ||||||
|  | 		} catch (ObjectDisposedException) {} | ||||||
|  | 		 | ||||||
|  | 		channel.Writer.TryComplete(); | ||||||
|  | 	} | ||||||
|  | } | ||||||
							
								
								
									
										45
									
								
								app/Utils/Observables/ObservableValue.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								app/Utils/Observables/ObservableValue.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,45 @@ | |||||||
|  | using System; | ||||||
|  | using System.Collections.Generic; | ||||||
|  | using System.Threading; | ||||||
|  | using System.Threading.Tasks; | ||||||
|  |  | ||||||
|  | namespace DHT.Utils.Observables; | ||||||
|  |  | ||||||
|  | public sealed class ObservableValue<T>(T? value) { | ||||||
|  | 	private readonly List<LastValueObserver<T>> observers = []; | ||||||
|  | 	private T? value = value; | ||||||
|  | 	 | ||||||
|  | 	public void Set(T value) { | ||||||
|  | 		lock (this) { | ||||||
|  | 			if (EqualityComparer<T>.Default.Equals(value, this.value)) { | ||||||
|  | 				return; | ||||||
|  | 			} | ||||||
|  | 			 | ||||||
|  | 			this.value = value; | ||||||
|  | 			 | ||||||
|  | 			foreach (var observer in observers) { | ||||||
|  | 				observer.Notify(value); | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
|  | 	public IDisposable SubscribeLast(Func<T, CancellationToken, Task> action, TaskScheduler scheduler) { | ||||||
|  | 		var observer = new LastValueObserver<T>(this, action, scheduler); | ||||||
|  | 		 | ||||||
|  | 		lock (this) { | ||||||
|  | 			observers.Add(observer); | ||||||
|  | 			 | ||||||
|  | 			if (value is not null) { | ||||||
|  | 				observer.Notify(value); | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		 | ||||||
|  | 		return observer; | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
|  | 	internal void Unsubscribe(LastValueObserver<T> observer) { | ||||||
|  | 		lock (this) { | ||||||
|  | 			observers.Remove(observer); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -1,64 +0,0 @@ | |||||||
| using System; |  | ||||||
| using System.Threading; |  | ||||||
| using System.Threading.Channels; |  | ||||||
| using System.Threading.Tasks; |  | ||||||
| using DHT.Utils.Logging; |  | ||||||
|  |  | ||||||
| namespace DHT.Utils.Tasks; |  | ||||||
|  |  | ||||||
| public sealed class DelayedThrottledTask<T> : IDisposable { |  | ||||||
| 	private readonly Channel<T> taskChannel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity: 1) { |  | ||||||
| 		SingleReader = true, |  | ||||||
| 		SingleWriter = false, |  | ||||||
| 		AllowSynchronousContinuations = false, |  | ||||||
| 		FullMode = BoundedChannelFullMode.DropOldest, |  | ||||||
| 	}); |  | ||||||
| 	 |  | ||||||
| 	private readonly CancellationTokenSource cancellationTokenSource = new (); |  | ||||||
| 	private readonly Log log; |  | ||||||
| 	private readonly TimeSpan delay; |  | ||||||
| 	private readonly Func<T, Task> inputProcessor; |  | ||||||
| 	 |  | ||||||
| 	public DelayedThrottledTask(Log log, TimeSpan delay, Func<T, Task> inputProcessor) { |  | ||||||
| 		this.log = log; |  | ||||||
| 		this.delay = delay; |  | ||||||
| 		this.inputProcessor = inputProcessor; |  | ||||||
| 		 |  | ||||||
| 		Task.Run(ReaderTask); |  | ||||||
| 	} |  | ||||||
| 	 |  | ||||||
| 	private async Task ReaderTask() { |  | ||||||
| 		CancellationToken cancellationToken = cancellationTokenSource.Token; |  | ||||||
| 		 |  | ||||||
| 		try { |  | ||||||
| 			while (await taskChannel.Reader.WaitToReadAsync(cancellationToken)) { |  | ||||||
| 				await Task.Delay(delay, cancellationToken); |  | ||||||
| 				 |  | ||||||
| 				T input = await taskChannel.Reader.ReadAsync(cancellationToken); |  | ||||||
| 				try { |  | ||||||
| 					await inputProcessor(input); |  | ||||||
| 				} catch (OperationCanceledException) { |  | ||||||
| 					throw; |  | ||||||
| 				} catch (Exception e) { |  | ||||||
| 					log.Error("Caught exception in task: " + e); |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 		} catch (OperationCanceledException) { |  | ||||||
| 			// Ignore. |  | ||||||
| 		} finally { |  | ||||||
| 			cancellationTokenSource.Dispose(); |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	 |  | ||||||
| 	public void Post(T input) { |  | ||||||
| 		taskChannel.Writer.TryWrite(input); |  | ||||||
| 	} |  | ||||||
| 	 |  | ||||||
| 	public void Dispose() { |  | ||||||
| 		try { |  | ||||||
| 			cancellationTokenSource.Cancel(); |  | ||||||
| 		} catch (ObjectDisposedException) {} |  | ||||||
| 		 |  | ||||||
| 		taskChannel.Writer.Complete(); |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| @@ -1,32 +0,0 @@ | |||||||
| using System; |  | ||||||
| using System.Reactive.Subjects; |  | ||||||
| using System.Threading; |  | ||||||
| using System.Threading.Tasks; |  | ||||||
| using DHT.Utils.Logging; |  | ||||||
|  |  | ||||||
| namespace DHT.Utils.Tasks; |  | ||||||
|  |  | ||||||
| public sealed class ObservableThrottledTask<T> : IObservable<T>, IDisposable { |  | ||||||
| 	private readonly ReplaySubject<T> subject; |  | ||||||
| 	private readonly ThrottledTask<T> task; |  | ||||||
| 	 |  | ||||||
| 	public ObservableThrottledTask(Log log, TaskScheduler resultScheduler) { |  | ||||||
| 		this.subject = new ReplaySubject<T>(bufferSize: 1); |  | ||||||
| 		this.task = new ThrottledTask<T>(log, subject.OnNext, resultScheduler); |  | ||||||
| 	} |  | ||||||
| 	 |  | ||||||
| 	public void Post(Func<CancellationToken, Task<T>> resultComputer) { |  | ||||||
| 		task.Post(resultComputer); |  | ||||||
| 	} |  | ||||||
| 	 |  | ||||||
| 	public IDisposable Subscribe(IObserver<T> observer) { |  | ||||||
| 		return subject.Subscribe(observer); |  | ||||||
| 	} |  | ||||||
| 	 |  | ||||||
| 	public void Dispose() { |  | ||||||
| 		task.Dispose(); |  | ||||||
| 		 |  | ||||||
| 		subject.OnCompleted(); |  | ||||||
| 		subject.Dispose(); |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| @@ -14,11 +14,13 @@ public abstract class ThrottledTaskBase<T> : IDisposable { | |||||||
| 		FullMode = BoundedChannelFullMode.DropOldest, | 		FullMode = BoundedChannelFullMode.DropOldest, | ||||||
| 	}); | 	}); | ||||||
| 	 | 	 | ||||||
| 	private readonly CancellationTokenSource cancellationTokenSource = new (); |  | ||||||
| 	private readonly Log log; | 	private readonly Log log; | ||||||
|  | 	private readonly TimeSpan delayBetweenRuns; | ||||||
|  | 	private readonly CancellationTokenSource cancellationTokenSource = new (); | ||||||
| 	 | 	 | ||||||
| 	internal ThrottledTaskBase(Log log) { | 	internal ThrottledTaskBase(Log log, TimeSpan delayBetweenRuns) { | ||||||
| 		this.log = log; | 		this.log = log; | ||||||
|  | 		this.delayBetweenRuns = delayBetweenRuns; | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	protected async Task ReaderTask() { | 	protected async Task ReaderTask() { | ||||||
| @@ -33,9 +35,9 @@ public abstract class ThrottledTaskBase<T> : IDisposable { | |||||||
| 				} catch (Exception e) { | 				} catch (Exception e) { | ||||||
| 					log.Error("Caught exception in task: " + e); | 					log.Error("Caught exception in task: " + e); | ||||||
| 				} | 				} | ||||||
|  | 				 | ||||||
|  | 				await Task.Delay(delayBetweenRuns, cancellationToken); | ||||||
| 			} | 			} | ||||||
| 		} catch (OperationCanceledException) { |  | ||||||
| 			// Ignore. |  | ||||||
| 		} finally { | 		} finally { | ||||||
| 			cancellationTokenSource.Dispose(); | 			cancellationTokenSource.Dispose(); | ||||||
| 		} | 		} | ||||||
| @@ -48,20 +50,20 @@ public abstract class ThrottledTaskBase<T> : IDisposable { | |||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	public void Dispose() { | 	public void Dispose() { | ||||||
| 		taskChannel.Writer.Complete(); |  | ||||||
| 		cancellationTokenSource.Cancel(); | 		cancellationTokenSource.Cancel(); | ||||||
|  | 		taskChannel.Writer.Complete(); | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| public sealed class ThrottledTask : ThrottledTaskBase<Task> { | public sealed class ThrottledTask : ThrottledTaskBase<Task> { | ||||||
| 	private readonly Action resultProcessor; | 	private readonly Func<Task> resultProcessor; | ||||||
| 	private readonly TaskScheduler resultScheduler; | 	private readonly TaskScheduler resultScheduler; | ||||||
| 	 | 	 | ||||||
| 	public ThrottledTask(Log log, Action resultProcessor, TaskScheduler resultScheduler) : base(log) { | 	public ThrottledTask(Log log, Func<Task> resultProcessor, TimeSpan delayBetweenRuns, TaskScheduler resultScheduler) : base(log, delayBetweenRuns) { | ||||||
| 		this.resultProcessor = resultProcessor; | 		this.resultProcessor = resultProcessor; | ||||||
| 		this.resultScheduler = resultScheduler; | 		this.resultScheduler = resultScheduler; | ||||||
| 		 | 		 | ||||||
| 		Task.Run(ReaderTask); | 		_ = ReaderTask(); | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	protected override async Task Run(Func<CancellationToken, Task> func, CancellationToken cancellationToken) { | 	protected override async Task Run(Func<CancellationToken, Task> func, CancellationToken cancellationToken) { | ||||||
| @@ -71,18 +73,22 @@ public sealed class ThrottledTask : ThrottledTaskBase<Task> { | |||||||
| } | } | ||||||
|  |  | ||||||
| public sealed class ThrottledTask<T> : ThrottledTaskBase<Task<T>> { | public sealed class ThrottledTask<T> : ThrottledTaskBase<Task<T>> { | ||||||
| 	private readonly Action<T> resultProcessor; | 	private readonly Func<T, Task> resultProcessor; | ||||||
| 	private readonly TaskScheduler resultScheduler; | 	private readonly TaskScheduler resultScheduler; | ||||||
| 	 | 	 | ||||||
| 	public ThrottledTask(Log log, Action<T> resultProcessor, TaskScheduler resultScheduler) : base(log) { | 	public ThrottledTask(Log log, Func<T, Task> resultProcessor, TimeSpan delayBetweenRuns, TaskScheduler resultScheduler) : base(log, delayBetweenRuns) { | ||||||
| 		this.resultProcessor = resultProcessor; | 		this.resultProcessor = resultProcessor; | ||||||
| 		this.resultScheduler = resultScheduler; | 		this.resultScheduler = resultScheduler; | ||||||
| 		 | 		 | ||||||
| 		Task.Run(ReaderTask); | 		_ = ReaderTask(); | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	protected override async Task Run(Func<CancellationToken, Task<T>> func, CancellationToken cancellationToken) { | 	protected override async Task Run(Func<CancellationToken, Task<T>> func, CancellationToken cancellationToken) { | ||||||
| 		T result = await func(cancellationToken); | 		T result = await func(cancellationToken); | ||||||
| 		await Task.Factory.StartNew(() => resultProcessor(result), cancellationToken, TaskCreationOptions.None, resultScheduler); | 		await Task.Factory.StartNew(() => resultProcessor(result), cancellationToken, TaskCreationOptions.None, resultScheduler); | ||||||
| 	} | 	} | ||||||
|  | 	 | ||||||
|  | 	public void Post(T result) { | ||||||
|  | 		base.Post(_ => Task.FromResult(result)); | ||||||
|  | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -16,7 +16,6 @@ | |||||||
|    |    | ||||||
|   <ItemGroup> |   <ItemGroup> | ||||||
|     <PackageReference Include="JetBrains.Annotations" Version="2023.2.0" /> |     <PackageReference Include="JetBrains.Annotations" Version="2023.2.0" /> | ||||||
|     <PackageReference Include="System.Reactive" Version="6.0.0" /> |  | ||||||
|   </ItemGroup> |   </ItemGroup> | ||||||
|    |    | ||||||
|   <ItemGroup> |   <ItemGroup> | ||||||
|   | |||||||
| @@ -8,5 +8,5 @@ using DHT.Utils; | |||||||
| namespace DHT.Utils; | namespace DHT.Utils; | ||||||
|  |  | ||||||
| static class Version { | static class Version { | ||||||
| 	public const string Tag = "47.1.0.0"; | 	public const string Tag = "47.2.0.0"; | ||||||
| } | } | ||||||
|   | |||||||
| @@ -22,7 +22,6 @@ define('LATEST_VERSION', $version === false ? '_' : $version); | |||||||
|       <h1>Discord History Tracker <span class="bar">|</span> <span class="notes"><a href="https://github.com/chylex/Discord-History-Tracker/releases">Release Notes</a></span></h1> |       <h1>Discord History Tracker <span class="bar">|</span> <span class="notes"><a href="https://github.com/chylex/Discord-History-Tracker/releases">Release Notes</a></span></h1> | ||||||
|       <p>Discord History Tracker lets you save chat history in your servers, groups, and private conversations, and view it offline.</p> |       <p>Discord History Tracker lets you save chat history in your servers, groups, and private conversations, and view it offline.</p> | ||||||
|       <img src="img/tracker.png" width="851" class="dht bordered"> |       <img src="img/tracker.png" width="851" class="dht bordered"> | ||||||
|       <p>This page explains how to use the desktop app. If you are looking for the older version of Discord History Tracker which only needs a browser or the Discord app, visit the page for the <a href="https://dht.chylex.com/browser-only">browser-only version</a>, however keep in mind that this version has <strong>significant limitations and fewer features</strong>.</p> |  | ||||||
|        |        | ||||||
|       <h2>How to Use</h2> |       <h2>How to Use</h2> | ||||||
|       <p>Download the latest version of the desktop app here, or visit <a href="https://github.com/chylex/Discord-History-Tracker/releases">All Releases</a> for older versions and release notes.</p> |       <p>Download the latest version of the desktop app here, or visit <a href="https://github.com/chylex/Discord-History-Tracker/releases">All Releases</a> for older versions and release notes.</p> | ||||||
| @@ -71,9 +70,9 @@ define('LATEST_VERSION', $version === false ? '_' : $version); | |||||||
|        |        | ||||||
|       <h2>External Links</h2> |       <h2>External Links</h2> | ||||||
|       <p class="links"> |       <p class="links"> | ||||||
|         <a href="https://github.com/chylex/Discord-History-Tracker/issues">Issues & Suggestions</a>  —  |         <a href="https://github.com/chylex/Discord-History-Tracker/issues">Issues & Suggestions</a>  •  | ||||||
|         <a href="https://github.com/chylex/Discord-History-Tracker">Source Code</a>  —  |         <a href="https://github.com/chylex/Discord-History-Tracker">Source Code</a>  •  | ||||||
|         <a href="https://twitter.com/chylexmc">Follow Dev on Twitter</a>  —  |         <a href="https://chylex.com">Developer's Website</a>  •  | ||||||
|         <a href="https://ko-fi.com/chylex">Support via Ko-fi</a> |         <a href="https://ko-fi.com/chylex">Support via Ko-fi</a> | ||||||
|       </p> |       </p> | ||||||
|        |        | ||||||
|   | |||||||
| @@ -165,7 +165,7 @@ code { | |||||||
|  |  | ||||||
| .downloads svg { | .downloads svg { | ||||||
|   margin: 1px 4px; |   margin: 1px 4px; | ||||||
|   vertical-align: -25%; |   vertical-align: -30%; | ||||||
| } | } | ||||||
|  |  | ||||||
| .downloads svg.icon-large { | .downloads svg.icon-large { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user