From 1c67da607cbccfd5970e5b4f02f7fb1a738299db Mon Sep 17 00:00:00 2001 From: Lightczx <1686188646@qq.com> Date: Mon, 4 Dec 2023 16:09:28 +0800 Subject: [PATCH] Add http sharding --- .../IO/Http/Sharding/HttpShardCopyWorker.cs | 124 ++++++++++++++++++ .../Snap.Hutao/Core/IO/StreamCopyWorker.cs | 53 +------- .../Request/Builder/HttpClientExtension.cs | 24 ++++ 3 files changed, 151 insertions(+), 50 deletions(-) create mode 100644 src/Snap.Hutao/Snap.Hutao/Core/IO/Http/Sharding/HttpShardCopyWorker.cs create mode 100644 src/Snap.Hutao/Snap.Hutao/Web/Request/Builder/HttpClientExtension.cs diff --git a/src/Snap.Hutao/Snap.Hutao/Core/IO/Http/Sharding/HttpShardCopyWorker.cs b/src/Snap.Hutao/Snap.Hutao/Core/IO/Http/Sharding/HttpShardCopyWorker.cs new file mode 100644 index 00000000..cd40349b --- /dev/null +++ b/src/Snap.Hutao/Snap.Hutao/Core/IO/Http/Sharding/HttpShardCopyWorker.cs @@ -0,0 +1,124 @@ +// Copyright (c) DGP Studio. All rights reserved. +// Licensed under the MIT license. + +using Microsoft.Win32.SafeHandles; +using Snap.Hutao.Web.Request.Builder; +using System.IO; +using System.Net.Http; + +namespace Snap.Hutao.Core.IO.Http.Sharding; + +internal sealed class HttpShardCopyWorker : IDisposable +{ + private const int ShardSize = 4 * 1024 * 1024; + + private readonly HttpClient httpClient; + private readonly string sourceUrl; + private readonly int bufferSize; + private readonly SafeFileHandle destFileHandle; + private readonly List shards; + + private HttpShardCopyWorker(HttpClient httpClient, string sourceUrl, string destFilePath, long contentLength, int bufferSize) + { + this.httpClient = httpClient; + this.sourceUrl = sourceUrl; + this.bufferSize = bufferSize; + + destFileHandle = File.OpenHandle( + destFilePath, + FileMode.Create, + FileAccess.Write, + FileShare.None, + FileOptions.RandomAccess | FileOptions.Asynchronous, + contentLength); + + shards = CalculateShards(contentLength); + } + + public static async ValueTask CreateAsync(HttpClient httpClient, string sourceUrl, string destFilePath, int bufferSize = 81920) + { + HttpResponseMessage response = await httpClient.HeadAsync(sourceUrl, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false); + response.EnsureSuccessStatusCode(); + + long contentLength = response.Content.Headers.ContentLength ?? 0; + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(contentLength); + + return new(httpClient, sourceUrl, destFilePath, contentLength, bufferSize); + } + + [SuppressMessage("", "SH003")] + public Task CopyAsync(CancellationToken cancellationToken = default) + { + return Parallel.ForEachAsync(shards, cancellationToken, CopyShardAsync); + } + + public void Dispose() + { + destFileHandle.Dispose(); + } + + private static List CalculateShards(long contentLength) + { + List shards = []; + long currentOffset = 0; + + while (currentOffset < contentLength) + { + long end = Math.Min(currentOffset + ShardSize, contentLength) - 1; + shards.Add(new Shard(currentOffset, end)); + currentOffset = end + 1; + } + + return shards; + } + + private async ValueTask CopyShardAsync(Shard shard, CancellationToken token) + { + HttpRequestMessage request = new(HttpMethod.Get, sourceUrl) + { + Headers = + { + Range = new(shard.StartOffset, shard.EndOffset), + }, + }; + + using (request) + { + using (HttpResponseMessage response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token).ConfigureAwait(false)) + { + response.EnsureSuccessStatusCode(); + + Memory buffer = new byte[bufferSize]; + using (Stream stream = await response.Content.ReadAsStreamAsync(token).ConfigureAwait(false)) + { + int streamOffset = 0; + do + { + int bytesRead = await stream.ReadAsync(buffer, token).ConfigureAwait(false); + if (bytesRead <= 0) + { + break; + } + + await RandomAccess.WriteAsync(destFileHandle, buffer[..bytesRead], shard.StartOffset + streamOffset, token).ConfigureAwait(false); + streamOffset += bytesRead; + } + while (true); + } + } + } + } + + private sealed class Shard + { + public Shard(long startOffset, long endOffset) + { + StartOffset = startOffset; + EndOffset = endOffset; + } + + public long StartOffset { get; } + + public long EndOffset { get; } + } +} \ No newline at end of file diff --git a/src/Snap.Hutao/Snap.Hutao/Core/IO/StreamCopyWorker.cs b/src/Snap.Hutao/Snap.Hutao/Core/IO/StreamCopyWorker.cs index 022ac66d..619b81b5 100644 --- a/src/Snap.Hutao/Snap.Hutao/Core/IO/StreamCopyWorker.cs +++ b/src/Snap.Hutao/Snap.Hutao/Core/IO/StreamCopyWorker.cs @@ -5,63 +5,16 @@ using System.IO; namespace Snap.Hutao.Core.IO; -/// -/// 流复制器 -/// -internal sealed class StreamCopyWorker +internal sealed class StreamCopyWorker : StreamCopyWorker { - private readonly Stream source; - private readonly Stream destination; - private readonly long totalBytes; - private readonly int bufferSize; - - /// - /// 创建一个新的流复制器 - /// - /// 源 - /// 目标 - /// 总字节 - /// 字节尺寸 public StreamCopyWorker(Stream source, Stream destination, long totalBytes, int bufferSize = 81920) + : base(source, destination, totalBytes => new StreamCopyStatus(totalBytes, totalBytes), bufferSize) { - Verify.Operation(source.CanRead, "Source Stream can't read"); - Verify.Operation(destination.CanWrite, "Destination Stream can't write"); - - this.source = source; - this.destination = destination; - this.totalBytes = totalBytes; - this.bufferSize = bufferSize; - } - - /// - /// 异步复制 - /// - /// 进度 - /// 任务 - public async ValueTask CopyAsync(IProgress progress) - { - long totalBytesRead = 0; - int bytesRead; - Memory buffer = new byte[bufferSize]; - - do - { - bytesRead = await source.ReadAsync(buffer).ConfigureAwait(false); - await destination.WriteAsync(buffer[..bytesRead]).ConfigureAwait(false); - - totalBytesRead += bytesRead; - progress.Report(new(totalBytesRead, totalBytes)); - } - while (bytesRead > 0); } } -/// -/// 针对特定类型的流复制器 -/// -/// 进度类型 [SuppressMessage("", "SA1402")] -internal sealed class StreamCopyWorker +internal class StreamCopyWorker { private readonly Stream source; private readonly Stream destination; diff --git a/src/Snap.Hutao/Snap.Hutao/Web/Request/Builder/HttpClientExtension.cs b/src/Snap.Hutao/Snap.Hutao/Web/Request/Builder/HttpClientExtension.cs new file mode 100644 index 00000000..78fc44e3 --- /dev/null +++ b/src/Snap.Hutao/Snap.Hutao/Web/Request/Builder/HttpClientExtension.cs @@ -0,0 +1,24 @@ +// Copyright (c) DGP Studio. All rights reserved. +// Licensed under the MIT license. + +using System.Net.Http; +using System.Runtime.CompilerServices; + +namespace Snap.Hutao.Web.Request.Builder; + +internal static class HttpClientExtension +{ + [SuppressMessage("", "SH003")] + public static Task HeadAsync(this HttpClient httpClient, [StringSyntax(StringSyntaxAttribute.Uri)] string? requestUri, HttpCompletionOption completionOption) + { + return httpClient.SendAsync(PrivateCreateRequestMessage(httpClient, HttpMethod.Get, CreateUri(default!, requestUri)), completionOption, CancellationToken.None); + } + + // private HttpRequestMessage CreateRequestMessage(HttpMethod method, Uri? uri) + [UnsafeAccessor(UnsafeAccessorKind.Method, Name = "CreateRequestMessage")] + private static extern HttpRequestMessage PrivateCreateRequestMessage(HttpClient httpClient, HttpMethod method, Uri? uri); + + // private static Uri? CreateUri(string? uri) + [UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "CreateUri")] + private static extern Uri? CreateUri(HttpClient discard, string? uri); +} \ No newline at end of file